diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java index f6b694035a..622ab40357 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/LoopWrapper.java @@ -163,12 +163,14 @@ protected void eval(Runtime runtime, State state, ThreadId threadId, Context ctx int batchSize = toBatchSize(runtime, ctx, parallelism); List> batches = batches(items, batchSize); + int itemIndexStart = 0; for (ArrayList batch : batches) { - evalBatch(state, threadId, batch, outVarsAccumulator); + evalBatch(itemIndexStart, state, threadId, batch, outVarsAccumulator); + itemIndexStart += batch.size(); } } - private void evalBatch(State state, ThreadId threadId, ArrayList items, Map> outVarsAccumulator) { + private void evalBatch(int itemIndexStart, State state, ThreadId threadId, ArrayList items, Map> outVarsAccumulator) { Frame frame = state.peekFrame(threadId); List> forks = items.stream() @@ -177,13 +179,12 @@ private void evalBatch(State state, ThreadId threadId, ArrayList i for (int i = 0; i < forks.size(); i++) { Map.Entry f = forks.get(i); - Frame cmdFrame = Frame.builder() .nonRoot() .build(); cmdFrame.setLocal(CURRENT_ITEMS, items); - cmdFrame.setLocal(CURRENT_INDEX, i); + cmdFrame.setLocal(CURRENT_INDEX, itemIndexStart + i); cmdFrame.setLocal(CURRENT_ITEM, f.getValue()); // fork will create rootFrame for forked commands diff --git a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java index c93fd6e595..7aca13ce27 100644 --- a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java +++ b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/MainTest.java @@ -1814,6 +1814,27 @@ public void testStringIfExpression() throws Exception { assertLog(log, ".*it's true.*"); } + @Test + public void testParallelLoopItemIndex() throws Exception { + deploy("parallelLoopItemIndex"); + + save(ProcessConfiguration.builder() + .build()); + + byte[] log = run(); + assertLog(log, ".*serial: five==5.*"); + assertLog(log, ".*serial: four==4.*"); + assertLog(log, ".*serial: three==3.*"); + assertLog(log, ".*serial: two==2.*"); + assertLog(log, ".*serial: one==1.*"); + + assertLog(log, ".*parallel: five==5.*"); + assertLog(log, ".*parallel: four==4.*"); + assertLog(log, ".*parallel: three==3.*"); + assertLog(log, ".*parallel: two==2.*"); + assertLog(log, ".*parallel: one==1.*"); + } + private void deploy(String resource) throws URISyntaxException, IOException { Path src = Paths.get(MainTest.class.getResource(resource).toURI()); IOUtils.copy(src, workDir); diff --git a/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml new file mode 100644 index 0000000000..ec30fa645f --- /dev/null +++ b/runtime/v2/runner/src/test/resources/com/walmartlabs/concord/runtime/v2/runner/parallelLoopItemIndex/concord.yml @@ -0,0 +1,27 @@ +flows: + default: + # serial + - call: main + in: + item: "${item}" + index: "${itemIndex}" + prefix: "serial" + out: x + loop: + mode: serial + items: ['one', 'two', 'three', 'four', 'five'] + + # parallel + - call: main + in: + item: "${item}" + index: "${itemIndex}" + prefix: "parallel" + out: x + loop: + mode: parallel + items: ['one', 'two', 'three', 'four', 'five'] + parallelism: 2 + + main: + - log: "${prefix += ': ' += item += '==' += (itemIndex+1)}" \ No newline at end of file