Skip to content

Commit

Permalink
fix: releasae all stopped monitors
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed Jan 12, 2024
1 parent 774aaee commit b849610
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static class ConnectionStatus {
private static final long MIN_CONNECTION_CHECK_TIMEOUT_MILLIS = 3000;
private static final String MONITORING_PROPERTY_PREFIX = "monitoring-";

private final Queue<MonitorConnectionContext> activeContexts = new ConcurrentLinkedQueue<>();
protected final Queue<MonitorConnectionContext> activeContexts = new ConcurrentLinkedQueue<>();
private final Queue<MonitorConnectionContext> newContexts = new ConcurrentLinkedQueue<>();
private final PluginService pluginService;
private final TelemetryFactory telemetryFactory;
Expand Down Expand Up @@ -237,15 +237,14 @@ public void run() {
this.nodeCheckTimeoutMillis = delayMillis;
}

TimeUnit.MILLISECONDS.sleep(delayMillis);
this.sleep(delayMillis);

} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
threadContainer.releaseResource(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
this.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}

} catch (final InterruptedException intEx) {
Expand Down Expand Up @@ -279,6 +278,7 @@ public void run() {
ex); // We want to print full trace stack of the exception.
}
} finally {
threadContainer.releaseResource(this);
this.stopped = true;
if (this.monitoringConn != null) {
try {
Expand Down Expand Up @@ -354,4 +354,11 @@ long getCurrentTimeNano() {
public boolean isStopped() {
return this.stopped;
}

/**
* Used to help with testing.
*/
protected void sleep(long duration) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(duration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -210,4 +212,32 @@ void test_9_runWithContext() {
// Clean-up
MonitorThreadContainer.releaseInstance();
}

@Test
void test_10_ensureStoppedMonitorIsRemovedFromMap() throws InterruptedException {
when(contextWithShortInterval.isActiveContext()).thenReturn(true);
when(contextWithShortInterval.getExpectedActiveMonitoringStartTimeNano()).thenReturn(999999999999999L);
doThrow(new InterruptedException("Test")).when(monitor).sleep(anyLong());
monitor.activeContexts.add(contextWithShortInterval);
final Map<String, Monitor> monitorMap = threadContainer.getMonitorMap();
final Map<Monitor, Future<?>> taskMap = threadContainer.getTasksMap();

// Put monitor into container map
final String nodeKey = "monitorA";
monitorMap.put(nodeKey, monitor);
taskMap.put(monitor, futureResult);

// Put context
monitor.startMonitoring(contextWithShortInterval);

// Run monitor
monitor.run();

// After running monitor should be out of the map
assertNull(monitorMap.get(nodeKey));
assertNull(taskMap.get(monitor));

// Clean-up
MonitorThreadContainer.releaseInstance();
}
}

0 comments on commit b849610

Please sign in to comment.