From 7a11224a928a4192895f664023966626182bf2d3 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Mon, 2 Dec 2024 19:00:00 +0530 Subject: [PATCH] Fix concurrency issue in aggregation --- .../synapse/mediators/eip/aggregator/Aggregate.java | 11 ++++++----- .../apache/synapse/mediators/v2/ScatterGather.java | 3 +-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index cbd7998886..aa8acf2480 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -115,9 +115,6 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli */ public synchronized boolean addMessage(MessageContext synCtx) { if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) { - if (messages == null) { - return false; - } messages.add(synCtx); return true; } else { @@ -312,10 +309,14 @@ public void run() { } public synchronized boolean getLock() { - return !locked; + if (!locked) { + locked = true; + return true; + } + return false; } - public void releaseLock() { + public synchronized void releaseLock() { locked = false; } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index 876183b7fe..c0e5e25f8f 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -448,7 +448,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } } else { synLog.traceOrDebug("Unable to find aggregation correlation property"); - return true; + return false; } // if there is an aggregate continue on aggregation if (aggregate != null) { @@ -471,7 +471,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } } else { synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); - return true; } return false; }