diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java index 02bb3f03f..0895f73a2 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java @@ -58,7 +58,7 @@ static class ConnectionStatus { private static final long MIN_CONNECTION_CHECK_TIMEOUT_MILLIS = 3000; private static final String MONITORING_PROPERTY_PREFIX = "monitoring-"; - private final Queue activeContexts = new ConcurrentLinkedQueue<>(); + protected final Queue activeContexts = new ConcurrentLinkedQueue<>(); private final Queue newContexts = new ConcurrentLinkedQueue<>(); private final PluginService pluginService; private final TelemetryFactory telemetryFactory; @@ -237,15 +237,14 @@ public void run() { this.nodeCheckTimeoutMillis = delayMillis; } - TimeUnit.MILLISECONDS.sleep(delayMillis); + this.sleep(delayMillis); } else { if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano) >= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) { - threadContainer.releaseResource(this); break; } - TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS); + this.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS); } } catch (final InterruptedException intEx) { @@ -279,6 +278,7 @@ public void run() { ex); // We want to print full trace stack of the exception. } } finally { + threadContainer.releaseResource(this); this.stopped = true; if (this.monitoringConn != null) { try { @@ -354,4 +354,11 @@ long getCurrentTimeNano() { public boolean isStopped() { return this.stopped; } + + /** + * Used to help with testing. + */ + protected void sleep(long duration) throws InterruptedException { + TimeUnit.MILLISECONDS.sleep(duration); + } } diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/efm/MonitorImplTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/efm/MonitorImplTest.java index 76107c994..558e2afba 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/efm/MonitorImplTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/efm/MonitorImplTest.java @@ -21,9 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -210,4 +212,32 @@ void test_9_runWithContext() { // Clean-up MonitorThreadContainer.releaseInstance(); } + + @Test + void test_10_ensureStoppedMonitorIsRemovedFromMap() throws InterruptedException { + when(contextWithShortInterval.isActiveContext()).thenReturn(true); + when(contextWithShortInterval.getExpectedActiveMonitoringStartTimeNano()).thenReturn(999999999999999L); + doThrow(new InterruptedException("Test")).when(monitor).sleep(anyLong()); + monitor.activeContexts.add(contextWithShortInterval); + final Map monitorMap = threadContainer.getMonitorMap(); + final Map> taskMap = threadContainer.getTasksMap(); + + // Put monitor into container map + final String nodeKey = "monitorA"; + monitorMap.put(nodeKey, monitor); + taskMap.put(monitor, futureResult); + + // Put context + monitor.startMonitoring(contextWithShortInterval); + + // Run monitor + monitor.run(); + + // After running monitor should be out of the map + assertNull(monitorMap.get(nodeKey)); + assertNull(taskMap.get(monitor)); + + // Clean-up + MonitorThreadContainer.releaseInstance(); + } }