From fc1f528703e08d233f77e7201f39cc76e8e3c280 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 7 Mar 2025 14:56:32 -0500 Subject: [PATCH] ARTEMIS-5342 Expose AckManager pending records on management --- .../activemq/artemis/logs/AuditLogger.java | 9 +++++ .../management/ActiveMQServerControl.java | 3 ++ .../amqp/connect/mirror/AckManager.java | 19 +++++----- .../impl/ActiveMQServerControlImpl.java | 8 ++++ .../artemis/core/server/ActiveMQServer.java | 5 +++ .../core/server/impl/ActiveMQServerImpl.java | 15 +++++++- .../core/server/mirror/MirrorRegistry.java | 38 +++++++++++++++++++ .../management/ActiveMQServerControlTest.java | 12 ++++++ .../ActiveMQServerControlUsingCoreTest.java | 7 ++++ 9 files changed, 105 insertions(+), 11 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 4b0c1773d09..dbb34f7349b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2866,4 +2866,13 @@ static void exportConfigAsProperties(Object source) { @LogMessage(id = 601798, value = "User {} is exporting configuration as properties on target resource: {}", level = LogMessage.Level.INFO) void exportConfigAsProperties(String user, Object source); + + static void getPendingMirrorAcks(Object source) { + BASE_LOGGER.getPendingMirrorAcks(getCaller(), source); + } + + @LogMessage(id = 601799, value = "User {} is getting PendingMirrorAcks on target resource: {}", level = LogMessage.Level.INFO) + void getPendingMirrorAcks(String user, Object source); + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 307ec0af2d4..c744e7aea49 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -2091,6 +2091,9 @@ void replay(@Parameter(name = "startScanDate", desc = "Start date where we will @Attribute(desc = AUTHORIZATION_FAILURE_COUNT) long getAuthorizationFailureCount(); + @Attribute(desc = "Number of pending acknowledgements records on mirroring") + int getPendingMirrorAcks(); + @Operation(desc = "Export the broker configuration as properties", impact = MBeanOperationInfo.ACTION) void exportConfigAsProperties() throws Exception; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 5e3b4b744fd..0ae57bcea32 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.LongSupplier; import io.netty.util.collection.LongObjectHashMap; @@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; @@ -79,11 +79,10 @@ public class AckManager implements ActiveMQComponent { volatile MultiStepProgress progress; ActiveMQScheduledComponent scheduledComponent; - private volatile int size; - private static final AtomicIntegerFieldUpdater sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size"); + final MirrorRegistry mirrorRegistry; public int size() { - return sizeUpdater.get(this); + return mirrorRegistry.getMirrorAckSize(); } public AckManager(ActiveMQServer server) { @@ -92,7 +91,7 @@ public AckManager(ActiveMQServer server) { this.configuration = server.getConfiguration(); this.ioCriticalErrorListener = server.getIoCriticalErrorListener(); this.sequenceGenerator = server.getStorageManager()::generateID; - + this.mirrorRegistry = server.getMirrorRegistry(); // The JournalHashMap has to use the storage manager to guarantee we are using the Replicated Journal Wrapper in case this is a replicated journal journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, server.getStorageManager(), AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener()); this.referenceIDSupplier = new ReferenceIDSupplier(server); @@ -100,7 +99,7 @@ public AckManager(ActiveMQServer server) { public void reload(RecordInfo recordInfo) { journalHashMapProvider.reload(recordInfo); - sizeUpdater.incrementAndGet(this); + mirrorRegistry.incrementMirrorAckSize(); } @Override @@ -323,7 +322,7 @@ private void validateExpireSet(SimpleString address, long queueID, JournalHashMa logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } if (retries.remove(retry) != null) { - sizeUpdater.decrementAndGet(AckManager.this); + mirrorRegistry.decrementMirrorAckSize(); } } else { if (logger.isDebugEnabled()) { @@ -379,7 +378,7 @@ private void retryPage(LongObjectHashMap recordsLoader); + + MirrorRegistry getMirrorRegistry(); + + int getPendingMirrorAcks(); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index aeb1a889815..082adebc7af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -170,6 +170,7 @@ import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -259,6 +260,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private HAPolicy haPolicy; + private MirrorRegistry mirrorRegistry = new MirrorRegistry(); + // This will be useful on tests or embedded private boolean rebuildCounters = true; @@ -4821,4 +4824,14 @@ public AutoCloseable managementLock() throws Exception { public IOCriticalErrorListener getIoCriticalErrorListener() { return ioCriticalErrorListener; } -} + + @Override + public MirrorRegistry getMirrorRegistry() { + return mirrorRegistry; + } + + @Override + public int getPendingMirrorAcks() { + return mirrorRegistry.getMirrorAckSize(); + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java new file mode 100644 index 00000000000..fb8106a7751 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.mirror; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public class MirrorRegistry { + + private volatile int mirrorAckSize; + private static final AtomicIntegerFieldUpdater sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(MirrorRegistry.class, "mirrorAckSize"); + + public int getMirrorAckSize() { + return sizeUpdater.get(this); + } + + public void incrementMirrorAckSize() { + sizeUpdater.incrementAndGet(this); + } + + public void decrementMirrorAckSize() { + sizeUpdater.decrementAndGet(this); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 1581f300083..138687b803d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -250,6 +250,18 @@ public void testGetAttributes() throws Exception { assertTrue(serverControl.isActive()); } + @TestTemplate + public void testPendingMirrorAcks() throws Exception { + ActiveMQServerControl serverControl = createManagementControl(); + // faking some data, to make sure we are not just returning a default value + for (int i = 0; i < 7; i++) { + server.getMirrorRegistry().incrementMirrorAckSize(); + } + + assertEquals(7, serverControl.getPendingMirrorAcks()); + assertEquals(7, server.getPendingMirrorAcks()); + } + @TestTemplate public void testBrokerPluginClassNames() throws Exception { ActiveMQServerControl serverControl = createManagementControl(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 4464cac8f9d..b4c20382174 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.Attribute; import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; @@ -1857,6 +1858,12 @@ public long getAuthorizationFailureCount() { return (long) proxy.retrieveAttributeValue("authorizationFailureCount"); } + + @Attribute(desc = "Number of pending acknowledgements records on mirroring") + public int getPendingMirrorAcks() { + return ((Number) proxy.retrieveAttributeValue("pendingMirrorAcks")).intValue(); + } + @Override public void exportConfigAsProperties() throws Exception { proxy.invokeOperation("exportConfigAsProperties");