From af5ebe3fdff99a4a92d48e07754bdbb4a791eb78 Mon Sep 17 00:00:00 2001 From: brig Date: Fri, 10 May 2024 18:04:45 -0400 Subject: [PATCH 1/3] runtime-v2: fix itemIndex in parallel loop --- .../runtime/v2/runner/vm/LoopWrapper.java | 10 ++++--- .../concord/runtime/v2/runner/MainTest.java | 21 ++++++++++++++ .../runner/parallelLoopItemIndex/concord.yml | 28 +++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java index f6b694035a..5e95541503 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java @@ -31,6 +31,7 @@ import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.*; +import java.io.Serial; import java.io.Serializable; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -163,12 +164,14 @@ protected void eval(Runtime runtime, State state, ThreadId threadId, Context ctx int batchSize = toBatchSize(runtime, ctx, parallelism); List> batches = batches(items, batchSize); + int itemIndexStart = 0; for (ArrayList batch : batches) { - evalBatch(state, threadId, batch, outVarsAccumulator); + evalBatch(itemIndexStart, state, threadId, batch, outVarsAccumulator); + itemIndexStart += batch.size(); } } - private void evalBatch(State state, ThreadId threadId, ArrayList items, Map> outVarsAccumulator) { + private void evalBatch(int itemIndexStart, State state, ThreadId threadId, ArrayList items, Map> outVarsAccumulator) { Frame frame = state.peekFrame(threadId); List> forks = items.stream() @@ -177,13 +180,12 @@ private void evalBatch(State state, ThreadId threadId, ArrayList i for (int i = 0; i < forks.size(); i++) { Map.Entry f = forks.get(i); - Frame cmdFrame = Frame.builder() .nonRoot() .build(); cmdFrame.setLocal(CURRENT_ITEMS, items); - cmdFrame.setLocal(CURRENT_INDEX, i); + cmdFrame.setLocal(CURRENT_INDEX, itemIndexStart + i); cmdFrame.setLocal(CURRENT_ITEM, f.getValue()); // fork will create rootFrame for forked commands diff --git a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java index c93fd6e595..7aca13ce27 100644 --- a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java +++ b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java @@ -1814,6 +1814,27 @@ public void testStringIfExpression() throws Exception { assertLog(log, ".*it's true.*"); } + @Test + public void testParallelLoopItemIndex() throws Exception { + deploy("parallelLoopItemIndex"); + + save(ProcessConfiguration.builder() + .build()); + + byte[] log = run(); + assertLog(log, ".*serial: five==5.*"); + assertLog(log, ".*serial: four==4.*"); + assertLog(log, ".*serial: three==3.*"); + assertLog(log, ".*serial: two==2.*"); + assertLog(log, ".*serial: one==1.*"); + + assertLog(log, ".*parallel: five==5.*"); + assertLog(log, ".*parallel: four==4.*"); + assertLog(log, ".*parallel: three==3.*"); + assertLog(log, ".*parallel: two==2.*"); + assertLog(log, ".*parallel: one==1.*"); + } + private void deploy(String resource) throws URISyntaxException, IOException { Path src = Paths.get(MainTest.class.getResource(resource).toURI()); IOUtils.copy(src, workDir); diff --git a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml new file mode 100644 index 0000000000..7110b267e2 --- /dev/null +++ b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml @@ -0,0 +1,28 @@ +flows: + default: + # serial + - call: main + in: + item: "${item}" + index: "${itemIndex}" + prefix: "serial" + out: x + loop: + mode: parallel + items: ['one', 'two', 'three', 'four', 'five'] + parallelism: 2 + + # parallel + - call: main + in: + item: "${item}" + index: "${itemIndex}" + prefix: "parallel" + out: x + loop: + mode: parallel + items: ['one', 'two', 'three', 'four', 'five'] + parallelism: 2 + + main: + - log: "${prefix += ': ' += item += '==' += (itemIndex+1)}" \ No newline at end of file From 122844d7f8d1b6414af900d0f60eaf77508364b3 Mon Sep 17 00:00:00 2001 From: brig Date: Fri, 10 May 2024 18:05:24 -0400 Subject: [PATCH 2/3] cleanup --- .../walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java index 5e95541503..622ab40357 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java @@ -31,7 +31,6 @@ import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.*; -import java.io.Serial; import java.io.Serializable; import java.util.*; import java.util.concurrent.ConcurrentHashMap; From 674191c70286b099a9d8a5702160cce136a9dd72 Mon Sep 17 00:00:00 2001 From: brig Date: Fri, 10 May 2024 18:15:27 -0400 Subject: [PATCH 3/3] cleanup --- .../runtime/v2/runner/parallelLoopItemIndex/concord.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml index 7110b267e2..ec30fa645f 100644 --- a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml +++ b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml @@ -8,9 +8,8 @@ flows: prefix: "serial" out: x loop: - mode: parallel + mode: serial items: ['one', 'two', 'three', 'four', 'five'] - parallelism: 2 # parallel - call: main