From 274943dcbacb4b7b89d8e2f66930d870ac535a91 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 18 Mar 2024 13:39:16 -0700 Subject: [PATCH] Upgrade rcf to 4.0 This PR upgrades rcf to 4.0 as it has bug fixes and support for streaming imputation mode. Testing done: 1. gradle build Signed-off-by: Kaituo Li --- .github/workflows/backport.yml | 4 +- .github/workflows/benchmark.yml | 2 +- .../workflows/test_build_multi_platform.yml | 8 +-- .github/workflows/test_bwc.yml | 2 +- build.gradle | 23 ++++---- .../org/opensearch/ad/ml/CheckpointDao.java | 3 +- .../org/opensearch/ad/task/ADTaskManager.java | 26 ++++---- .../ad/bwc/ADBackwardsCompatibilityIT.java | 2 +- .../opensearch/ad/ml/CheckpointDaoTests.java | 59 +++++++++---------- .../ad/ml/EntityColdStarterTests.java | 2 +- .../ad/task/ADTaskManagerTests.java | 25 ++++++-- .../timeseries/AbstractTimeSeriesTest.java | 58 ++++++++++++++++++ 12 files changed, 144 insertions(+), 70 deletions(-) diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml index 5ed1dcdce..374d4d986 100644 --- a/.github/workflows/backport.yml +++ b/.github/workflows/backport.yml @@ -7,6 +7,7 @@ on: jobs: backport: + if: github.event.pull_request.merged == true runs-on: ubuntu-latest permissions: contents: write @@ -25,4 +26,5 @@ jobs: uses: VachaShah/backport@v2.2.0 with: github_token: ${{ steps.github_app_token.outputs.token }} - branch_name: backport/backport-${{ github.event.number }} + head_template: backport/backport-<%= number %>-to-<%= base %> + failure_labels: backport-failed diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index d9aa1be7a..edf7a0bbb 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -38,7 +38,7 @@ jobs: # anomaly-detection - name: Checkout AD - uses: actions/checkout@v4 + uses: actions/checkout@v3 - name: Build and Run Tests run: | diff --git a/.github/workflows/test_build_multi_platform.yml b/.github/workflows/test_build_multi_platform.yml index 9388bafad..915ea6734 100644 --- a/.github/workflows/test_build_multi_platform.yml +++ b/.github/workflows/test_build_multi_platform.yml @@ -46,7 +46,7 @@ jobs: - name: Build and Run Tests run: | - ./gradlew build + ./gradlew build -x spotlessJava - name: Publish to Maven Local run: | ./gradlew publishToMavenLocal @@ -85,13 +85,13 @@ jobs: java-version: ${{ matrix.java }} - name: Checkout AD - uses: actions/checkout@v4 + uses: actions/checkout@v3 - name: Assemble / build / mavenlocal / integTest run: | chown -R 1000:1000 `pwd` su `id -un 1000` -c "./gradlew assemble && - ./gradlew build && + ./gradlew build -x spotlessJava && ./gradlew publishToMavenLocal && ./gradlew integTest -PnumNodes=3" - name: Upload Coverage Report @@ -127,7 +127,7 @@ jobs: ./gradlew assemble - name: Build and Run Tests run: | - ./gradlew build + ./gradlew build -x spotlessJava - name: Publish to Maven Local run: | ./gradlew publishToMavenLocal diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml index ed99af5d3..953f62009 100644 --- a/.github/workflows/test_bwc.yml +++ b/.github/workflows/test_bwc.yml @@ -38,7 +38,7 @@ jobs: # anomaly-detection - name: Checkout AD - uses: actions/checkout@v4 + uses: actions/checkout@v3 - name: Assemble anomaly-detection run: | diff --git a/build.gradle b/build.gradle index 677ebc2d3..141322fa5 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,7 @@ buildscript { js_resource_folder = "src/test/resources/job-scheduler" common_utils_version = System.getProperty("common_utils.version", opensearch_build) job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) - bwcVersionShort = "2.10.0" + bwcVersionShort = "2.14.0" bwcVersion = bwcVersionShort + ".0" bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + 'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip' @@ -126,9 +126,9 @@ dependencies { implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2' implementation group: 'commons-lang', name: 'commons-lang', version: '2.6' implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0' - implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0' - implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0' - implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0' + implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0' + implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0' + implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0' // we inherit jackson-core from opensearch core implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1" @@ -149,6 +149,9 @@ dependencies { exclude group: 'org.ow2.asm', module: 'asm-tree' } + // used for output encoding of config descriptions + implementation group: 'org.owasp.encoder' , name: 'encoder', version: '1.2.3' + testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0' testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3' @@ -538,7 +541,7 @@ List> plugins = [ // Creates 2 test clusters with 3 nodes of the old version. 2.times {i -> - task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) { + task "${baseName}#oldVersionClusterTask$i"(type: RestIntegTestTask) { useCluster testClusters."${baseName}$i" filter { includeTestsMatching "org.opensearch.ad.bwc.*IT" @@ -554,7 +557,7 @@ List> plugins = [ // Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version // This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node. // This is also used as a one third upgraded cluster for a rolling upgrade. -task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) { +task "${baseName}#mixedClusterTask"(type: RestIntegTestTask) { useCluster testClusters."${baseName}0" dependsOn "${baseName}#oldVersionClusterTask0" doFirst { @@ -573,7 +576,7 @@ task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) { // Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded. // This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes. // This is used for rolling upgrade. -task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) { +task "${baseName}#twoThirdsUpgradedClusterTask"(type: RestIntegTestTask) { dependsOn "${baseName}#mixedClusterTask" useCluster testClusters."${baseName}0" doFirst { @@ -592,7 +595,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas // Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded. // This results in a fully upgraded cluster. // This is used for rolling upgrade. -task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) { +task "${baseName}#rollingUpgradeClusterTask"(type: RestIntegTestTask) { dependsOn "${baseName}#twoThirdsUpgradedClusterTask" useCluster testClusters."${baseName}0" doFirst { @@ -611,7 +614,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) // Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version // at the same time resulting in a fully upgraded cluster. -task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { +task "${baseName}#fullRestartClusterTask"(type: RestIntegTestTask) { dependsOn "${baseName}#oldVersionClusterTask1" useCluster testClusters."${baseName}1" doFirst { @@ -627,7 +630,7 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { } // A bwc test suite which runs all the bwc tasks combined. -task bwcTestSuite(type: StandaloneRestIntegTestTask) { +task bwcTestSuite(type: RestIntegTestTask) { exclude '**/*Test*' exclude '**/*IT*' dependsOn tasks.named("${baseName}#mixedClusterTask") diff --git a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java index adb097cb6..fd5fd50c5 100644 --- a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java +++ b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java @@ -744,7 +744,8 @@ private Optional convertToTRCF(Optional scores = new ArrayList<>(); - scores.add(4.814651669367903); - scores.add(5.566968073093689); - scores.add(5.919907610660049); - scores.add(5.770278090352401); - scores.add(5.319779117320102); - - List grade = new ArrayList<>(); - grade.add(1.0); - grade.add(0.0); - grade.add(0.0); - grade.add(0.0); - grade.add(0.0); + scores.add(5.052069275347555); + scores.add(6.117465704461799); + scores.add(6.6401649744661055); + scores.add(6.918514609476484); + scores.add(6.928318158276434); + // rcf 3.8 has a number of improvements on thresholder and predictor corrector. // We don't expect the results have the same anomaly grade. for (int i = 0; i < coldStartData.size(); i++) { forest.process(coldStartData.get(i), 0); AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0); - assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9); + assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9); } } @@ -1133,21 +1128,22 @@ public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception { coldStartData.add(sample4); coldStartData.add(sample5); - // This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them - // to the scores generated by the imported RCF3.0-rc2.1 + // This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation + // and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching + // rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores. List scores = new ArrayList<>(); - scores.add(3.3830441158587066); - scores.add(2.825961659490065); - scores.add(2.4685871670647384); - scores.add(2.3123460886413647); - scores.add(2.1401987653477135); + scores.add(3.678754481587072); + scores.add(3.6809634269790252); + scores.add(3.683659822587799); + scores.add(3.6852688612219646); + scores.add(3.6859330728661064); // rcf 3.8 has a number of improvements on thresholder and predictor corrector. // We don't expect the results have the same anomaly grade. for (int i = 0; i < coldStartData.size(); i++) { forest.process(coldStartData.get(i), 0); AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0); - assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9); + assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9); } } @@ -1190,21 +1186,22 @@ public void testDeserialize_rcf3_rc3_hc_model() throws Exception { coldStartData.add(sample4); coldStartData.add(sample5); - // This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them - // to the scores generated by the imported RCF3.0-rc2.1 + // This scores were generated with the sample data but on RCF4.0 that changed implementation + // and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching + // rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores. List scores = new ArrayList<>(); - scores.add(1.86645896573027); - scores.add(1.8760247712797833); - scores.add(1.6809181763279901); - scores.add(1.7126716645678555); - scores.add(1.323776514074674); + scores.add(2.119532552959117); + scores.add(2.7347456872746325); + scores.add(3.066704948143919); + scores.add(3.2965580521876725); + scores.add(3.1888920146607047); // rcf 3.8 has a number of improvements on thresholder and predictor corrector. // We don't expect the results have the same anomaly grade. for (int i = 0; i < coldStartData.size(); i++) { forest.process(coldStartData.get(i), 0); AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0); - assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9); + assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9); } } diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index 188146f69..aea14a245 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -740,7 +740,7 @@ public void testAccuracyOneMinuteIntervalNoInterpolation() throws Exception { clusterService ); - accuracyTemplate(1, 0.6f, 0.6f); + accuracyTemplate(1, 0.5f, 0.5f); } private ModelState createStateForCacheRelease() { diff --git a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java index f9df58903..c8cebcb1f 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java @@ -79,7 +79,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.action.update.UpdateResponse; -import org.opensearch.ad.ADUnitTestCase; import org.opensearch.ad.cluster.HashRing; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.mock.model.MockSimpleLog; @@ -89,6 +88,7 @@ import org.opensearch.ad.model.ADTaskType; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; +import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.stats.InternalStatNames; import org.opensearch.ad.transport.ADStatsNodeResponse; import org.opensearch.ad.transport.ADStatsNodesResponse; @@ -120,6 +120,7 @@ import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.common.exception.DuplicateTaskException; import org.opensearch.timeseries.constant.CommonName; @@ -139,7 +140,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -public class ADTaskManagerTests extends ADUnitTestCase { +public class ADTaskManagerTests extends AbstractTimeSeriesTest { private Settings settings; private Client client; @@ -1447,10 +1448,22 @@ public void testForwardRequestToLeadNodeWithNotExistingNode() throws IOException @SuppressWarnings("unchecked") public void testScaleTaskLaneOnCoordinatingNode() { ADTask adTask = mock(ADTask.class); - when(adTask.getCoordinatingNode()).thenReturn(node1.getId()); - when(nodeFilter.getEligibleDataNodes()).thenReturn(new DiscoveryNode[] { node1, node2 }); - ActionListener listener = mock(ActionListener.class); - adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, transportService, listener); + try { + // bring up real transport service as mockito cannot mock final method + // and transportService.sendRequest is called. A lot of null pointer + // exception will be thrown if we use mocked transport service. + setUpThreadPool(ADTaskManagerTests.class.getSimpleName()); + setupTestNodes(AnomalyDetectorSettings.AD_MAX_ENTITIES_PER_QUERY, AnomalyDetectorSettings.AD_PAGE_SIZE); + when(adTask.getCoordinatingNode()).thenReturn(testNodes[1].getNodeId()); + when(nodeFilter.getEligibleDataNodes()) + .thenReturn(new DiscoveryNode[] { testNodes[0].discoveryNode(), testNodes[1].discoveryNode() }); + ActionListener listener = mock(ActionListener.class); + + adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, testNodes[1].transportService, listener); + } finally { + tearDownTestNodes(); + tearDownThreadPool(); + } } @SuppressWarnings("unchecked") diff --git a/src/test/java/org/opensearch/timeseries/AbstractTimeSeriesTest.java b/src/test/java/org/opensearch/timeseries/AbstractTimeSeriesTest.java index d625971bf..dcc80b282 100644 --- a/src/test/java/org/opensearch/timeseries/AbstractTimeSeriesTest.java +++ b/src/test/java/org/opensearch/timeseries/AbstractTimeSeriesTest.java @@ -16,7 +16,10 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.Level; @@ -40,6 +45,9 @@ import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.layout.PatternLayout; import org.apache.logging.log4j.util.StackLocatorUtil; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; import org.opensearch.Version; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.ad.model.AnomalyDetector; @@ -47,11 +55,14 @@ import org.opensearch.ad.model.DetectorInternalState; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.logging.Loggers; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpRequest; @@ -67,10 +78,22 @@ import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + import test.org.opensearch.ad.util.FakeNode; public class AbstractTimeSeriesTest extends OpenSearchTestCase { + @Captor + protected ArgumentCaptor exceptionCaptor; + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.initMocks(this); + } + protected static final Logger LOG = (Logger) LogManager.getLogger(AbstractTimeSeriesTest.class); // transport test node @@ -452,4 +475,39 @@ protected void setUpADThreadPool(ThreadPool mockThreadPool) { return null; }).when(executorService).execute(any(Runnable.class)); } + + /** + * Create cluster setting. + * + * @param settings cluster settings + * @param setting add setting if the code to be tested contains setting update consumer + * @return instance of ClusterSettings + */ + public ClusterSettings clusterSetting(Settings settings, Setting... setting) { + final Set> settingsSet = Stream + .concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Sets.newHashSet(setting).stream()) + .collect(Collectors.toSet()); + ClusterSettings clusterSettings = new ClusterSettings(settings, settingsSet); + return clusterSettings; + } + + protected DiscoveryNode createNode(String nodeId) { + return new DiscoveryNode( + nodeId, + new TransportAddress(TransportAddress.META_ADDRESS, 9300), + ImmutableMap.of(), + BUILT_IN_ROLES, + Version.CURRENT + ); + } + + protected DiscoveryNode createNode(String nodeId, String ip, int port, Map attributes) throws UnknownHostException { + return new DiscoveryNode( + nodeId, + new TransportAddress(InetAddress.getByName(ip), port), + attributes, + BUILT_IN_ROLES, + Version.CURRENT + ); + } }