From fb4a13e61b503c6b43226cf97c860a936cb62315 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Tue, 3 Sep 2024 09:45:05 -0700 Subject: [PATCH] Adding remote index and multi index checks in validation Signed-off-by: Amit Galitzky --- .../timeseries/constant/CommonMessages.java | 4 + .../timeseries/rest/RestValidateAction.java | 1 - .../AbstractTimeSeriesActionHandler.java | 218 ++++++++++++------ .../BaseValidateConfigTransportAction.java | 6 +- .../util/CrossClusterConfigUtils.java | 102 ++++++++ .../MultiResponsesDelegateActionListener.java | 5 +- ...stractForecasterActionHandlerTestCase.java | 5 + ...ndexAnomalyDetectorActionHandlerTests.java | 5 + .../IndexForecasterActionHandlerTests.java | 9 +- ...dateAnomalyDetectorActionHandlerTests.java | 6 +- ...teAnomalyDetectorTransportActionTests.java | 16 +- .../util/CrossClusterConfigUtilsTests.java | 66 ++++++ 12 files changed, 349 insertions(+), 94 deletions(-) create mode 100644 src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java create mode 100644 src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 059947f91..16aa3fda2 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -37,6 +37,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore)"; public static String FAIL_TO_VALIDATE = "failed to validate"; public static String INVALID_TIMESTAMP = "Timestamp field: (%s) must be of type date"; + public static String NON_EXISTENT_TIMESTAMP_IN_INDEX = "Timestamp field: (%s) is not found in the (%s) index mapping"; public static String NON_EXISTENT_TIMESTAMP = "Timestamp field: (%s) is not found in index mapping"; public static String INVALID_NAME = "Valid characters for name are a-z, A-Z, 0-9, -(hyphen), _(underscore) and .(period)"; // change this error message to make it compatible with old version's integration(nexus) test @@ -74,6 +75,9 @@ public static String getTooManyCategoricalFieldErr(int limit) { + " characters."; public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; + public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping"; + public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s "; + public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; // ====================================== diff --git a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java index fa546c3d9..e9da98c8d 100644 --- a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java +++ b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java @@ -84,7 +84,6 @@ private Boolean validationTypesAreAccepted(String validationType) { public ValidateConfigRequest prepareRequest(RestRequest request, NodeClient client, String typesStr) throws IOException { XContentParser parser = request.contentParser(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - // if type param isn't blank and isn't a part of possible validation types throws exception if (!StringUtils.isBlank(typesStr)) { if (!validationTypesAreAccepted(typesStr)) { diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index bba0a4f09..deb7fed0a 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -7,21 +7,14 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG; +import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED; import static org.opensearch.timeseries.util.ParseUtils.parseAggregators; import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery; import java.io.IOException; import java.time.Clock; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; @@ -76,10 +69,7 @@ import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; -import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; -import org.opensearch.timeseries.util.ParseUtils; -import org.opensearch.timeseries.util.RestHandlerUtils; -import org.opensearch.timeseries.util.SecurityClientUtil; +import org.opensearch.timeseries.util.*; import org.opensearch.transport.TransportService; import com.google.common.collect.Sets; @@ -241,7 +231,6 @@ public void start(ActionListener listener) { createOrUpdateConfig(listener); return; } - if (this.isDryRun) { if (timeSeriesIndices.doesIndexExist(resultIndexOrAlias) || timeSeriesIndices.doesAliasExist(resultIndexOrAlias)) { timeSeriesIndices @@ -304,64 +293,110 @@ protected void validateName(boolean indexingDryRun, ActionListener listener) protected void validateTimeField(boolean indexingDryRun, ActionListener listener) { String givenTimeField = config.getTimeField(); - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(givenTimeField); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - // comments explaining fieldMappingResponse parsing can be found inside validateCategoricalField(String, boolean) - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { - boolean foundField = false; - Map> mappingsByIndex = getMappingsResponse.mappings(); + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); - for (Map mappingsByField : mappingsByIndex.values()) { - for (Map.Entry field2Metadata : mappingsByField.entrySet()) { + ActionListener>> validateGetMappingForTimeFieldListener = ActionListener.wrap(response -> { + prepareConfigIndexing(indexingDryRun, listener); + }, exception -> { listener.onFailure(createValidationException(exception.getMessage(), ValidationIssueType.TIMEFIELD_FIELD)); }); + MultiResponsesDelegateActionListener>> multiGetMappingResponseListener = + new MultiResponsesDelegateActionListener<>( + validateGetMappingForTimeFieldListener, + clusterIndicesMap.entrySet().size(), + String.format(Locale.ROOT, TIMESTAMP_VALIDATION_FAILED, config.getName()), + false + ); - GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); - if (fieldMetadata != null) { - // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field - Map fieldMap = fieldMetadata.sourceAsMap(); - if (fieldMap != null) { - for (Object type : fieldMap.values()) { - if (type instanceof Map) { - foundField = true; - Map metadataMap = (Map) type; - String typeName = (String) metadataMap.get(CommonName.TYPE); - if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect - ) - ); - return; + for (Map.Entry> clusterIndicesEntry : clusterIndicesMap.entrySet()) { + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex.indices((clusterIndicesEntry.getValue().toArray(new String[0]))).fields(givenTimeField); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + ActionListener getMappingResponseListener = ActionListener.wrap(getMappingsResponse -> { + boolean foundField = false; + Map> mappingsByIndex = getMappingsResponse.mappings(); + for (Map.Entry> mappingsByField : mappingsByIndex + .entrySet()) { + if (mappingsByField.getValue().isEmpty()) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String + .format( + Locale.ROOT, + CommonMessages.NON_EXISTENT_TIMESTAMP_IN_INDEX, + givenTimeField, + mappingsByField.getKey() + ), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + for (Map.Entry field2Metadata : mappingsByField + .getValue() + .entrySet()) { + GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); + if (fieldMetadata != null) { + // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field + Map fieldMap = fieldMetadata.sourceAsMap(); + if (fieldMap != null) { + for (Object type : fieldMap.values()) { + if (type instanceof Map) { + foundField = true; + Map metadataMap = (Map) type; + String typeName = (String) metadataMap.get(CommonName.TYPE); + if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } } } } } } } - } - if (!foundField) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect + if (!foundField) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + + multiGetMappingResponseListener + .onResponse( + new MergeableList>( + new ArrayList>(Collections.singletonList(Optional.empty())) ) ); - return; - } - prepareConfigIndexing(indexingDryRun, listener); - }, error -> { - String message = String.format(Locale.ROOT, "Fail to get the index mapping of %s", config.getIndices()); - logger.error(message, error); - listener.onFailure(new IllegalArgumentException(message)); - }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + }, e -> { + String errorMessage = String.format(Locale.ROOT, "Fail to get the index mapping of %s", clusterIndicesEntry.getValue()); + logger.error(errorMessage, e); + multiGetMappingResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e)); + }); + clientUtil + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingResponseListener + ); + } } /** @@ -448,7 +483,6 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.existsQuery(Config.CATEGORY_FIELD)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeout); - SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder); client .search( @@ -460,7 +494,7 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing ) ); } else { - validateCategoricalField(configId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(configId, indexingDryRun, listener); } } @@ -522,12 +556,33 @@ protected void onSearchHCConfigResponse(SearchResponse response, String detector } listener.onFailure(new IllegalArgumentException(errorMsg)); } else { - validateCategoricalField(detectorId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(detectorId, indexingDryRun, listener); } } - @SuppressWarnings("unchecked") - protected void validateCategoricalField(String configId, boolean indexingDryRun, ActionListener listener) { + protected void validateCategoricalFieldsInAllIndices(String configId, boolean indexingDryRun, ActionListener listener) { + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); + + Iterator>> iterator = clusterIndicesMap.entrySet().iterator(); + + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); + + } + + protected void validateCategoricalFieldRecursive( + Iterator>> iterator, + String configId, + boolean indexingDryRun, + ActionListener listener + ) { + if (!iterator.hasNext()) { + searchConfigInputIndices(configId, indexingDryRun, listener); // Call after all indices are validated + return; + } + + // Get the next cluster indices entry + Map.Entry> clusterIndicesEntry = iterator.next(); List categoryField = config.getCategoryFields(); // categoryField should have at least 1 element. Otherwise, we won't reach here. @@ -537,12 +592,16 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, // throws validation exception before reaching here String categoryField0 = categoryField.get(0); - - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(categoryField.toArray(new String[0])); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + // Create the GetFieldMappingsRequest for each index + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex + .indices(clusterIndicesEntry.getValue().toArray(new String[0])) + .fields(categoryField.toArray(new String[0])); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + + // Define the listener for each getMapping request + ActionListener getMappingsListener = ActionListener.wrap(getMappingsResponse -> { // example getMappingsResponse: // GetFieldMappingsResponse{mappings={server-metrics={_doc={service=FieldMappingMetadata{fullName='service', // source=org.opensearch.core.common.bytes.BytesArray@7ba87dbd}}}}} @@ -596,18 +655,25 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, ); return; } + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); - searchConfigInputIndices(configId, indexingDryRun, listener); }, error -> { String message = String.format(Locale.ROOT, CommonMessages.FAIL_TO_GET_MAPPING_MSG, config.getIndices()); logger.error(message, error); listener.onFailure(new IllegalArgumentException(message)); }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingsListener + ); } + @SuppressWarnings("unchecked") protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener listener) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery()) diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java index dca5d9ee1..d2fe30c02 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java @@ -8,10 +8,7 @@ import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -205,7 +202,6 @@ public void validateExecute( storedContext.restore(); Config config = request.getConfig(); ActionListener validateListener = ActionListener.wrap(response -> { - logger.debug("Result of validation process " + response); // forcing response to be empty listener.onResponse(new ValidateConfigResponse((ConfigValidationIssue) null)); }, exception -> { diff --git a/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java new file mode 100644 index 000000000..72cf2f400 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; + +public class CrossClusterConfigUtils { + private static final Logger logger = LogManager.getLogger(ParseUtils.class); + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param localClusterName The name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, String localClusterName) { + return clusterName.equals(localClusterName) ? client : client.getRemoteClusterClient(clusterName); + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, ClusterService clusterService) { + logger.info("clusterName1: " + clusterName); + logger.info("clusterService.getClusterName().value(): " + clusterService.getClusterName().value()); + + return getClientForCluster(clusterName, client, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to List of index names + * @param indexes A list of index names in cluster_name:index_name format. + * Local indexes can also be in index_name format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A map of cluster_name:index names + */ + public static HashMap> separateClusterIndexes(List indexes, ClusterService clusterService) { + return separateClusterIndexes(indexes, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to list of index_name + * @param indexes A list of index names in cluster_name:index_name format. + * @param localClusterName The name of the local cluster. + * @return A map of cluster_name to List index_name + */ + public static HashMap> separateClusterIndexes(List indexes, String localClusterName) { + HashMap> output = new HashMap<>(); + for (String index : indexes) { + String clusterName = parseClusterName(index); + String indexName = parseIndexName(index); + + // If the index entry does not have a cluster_name, it indicates the index is on the local cluster. + if (clusterName.isEmpty()) { + clusterName = localClusterName; + } + output.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName); + } + return output; + } + + /** + * @param index The name of the index to evaluate. + * Can be in either cluster_name:index_name or index_name format. + * @return The index name. + */ + public static String parseIndexName(String index) { + if (index.contains(":")) { + String[] parts = index.split(":"); + return parts.length > 1 ? parts[1] : index; + } else { + return index; + } + } + + /** + * @param index The name of the index to evaluate. + * Can be in either cluster_name:index_name or index_name format. + * @return The index name. + */ + public static String parseClusterName(String index) { + return index.contains(":") ? index.substring(0, index.indexOf(':')) : ""; + } +} diff --git a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java index 7dd830435..72dbc3361 100644 --- a/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java +++ b/src/main/java/org/opensearch/timeseries/util/MultiResponsesDelegateActionListener.java @@ -11,10 +11,7 @@ package org.opensearch.timeseries.util; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Locale; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java index 5bee84cba..d1228ae98 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java @@ -73,6 +73,8 @@ public class AbstractForecasterActionHandlerTestCase extends AbstractTimeSeriesT protected ThreadContext threadContext; protected SecurityClientUtil clientUtil; protected String categoricalField; + // @Mock + protected ClusterName clusterName; @SuppressWarnings("unchecked") @Override @@ -85,6 +87,9 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); ClusterName clusterName = new ClusterName("test"); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build(); when(clusterService.state()).thenReturn(clusterState); diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index c62e975cf..36d8157d7 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -100,6 +100,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractTimeSeriesTe private RestRequest.Method method; private ADTaskManager adTaskManager; private SearchFeatureDao searchFeatureDao; + private ClusterName clusterName; @BeforeClass public static void beforeClass() { @@ -157,6 +158,10 @@ public void setUp() throws Exception { searchFeatureDao = mock(SearchFeatureDao.class); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); + handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java index e78b154ea..86702129e 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; @@ -240,7 +241,7 @@ public void doE verify(clientSpy, times(1)).execute(eq(GetAction.INSTANCE), any(), any()); } - public void testFaiToParse() throws InterruptedException { + public void testFailToParse() throws InterruptedException { NodeClient client = new NodeClient(Settings.EMPTY, threadPool) { @Override public void doExecute( @@ -273,6 +274,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.PUT; @@ -508,6 +512,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.POST; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java index 2fea7b5db..a5d88ef53 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java @@ -43,6 +43,7 @@ import org.opensearch.ad.task.ADTaskManager; import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -91,6 +92,7 @@ public class ValidateAnomalyDetectorActionHandlerTests extends AbstractTimeSerie @Mock protected ThreadPool threadPool; protected ThreadContext threadContext; + protected ClusterName mockClusterName; @SuppressWarnings("unchecked") @Override @@ -106,7 +108,9 @@ public void setUp() throws Exception { anomalyDetectionIndices = mock(ADIndexManagement.class); when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true); - + mockClusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(mockClusterName); + when(mockClusterName.value()).thenReturn("test"); detectorId = "123"; seqNo = 0L; primaryTerm = 0L; diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java index 8550dce8a..041c1c512 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java @@ -490,9 +490,11 @@ public void testValidateAnomalyDetectorWithNonExistentTimefield() throws IOExcep ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response + .getIssue() + .getMessage() + .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) ); } @@ -513,9 +515,11 @@ public void testValidateAnomalyDetectorWithNonDateTimeField() throws IOException ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response + .getIssue() + .getMessage() + .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) ); } diff --git a/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java new file mode 100644 index 000000000..19fe6ffd6 --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java @@ -0,0 +1,66 @@ +package org.opensearch.timeseries.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.mockito.Mock; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.test.OpenSearchTestCase; + +public class CrossClusterConfigUtilsTests extends OpenSearchTestCase { + + @Mock + private Client clientMock; + + public void testGetClientForClusterLocalCluster() { + String clusterName = "localCluster"; + Client mockClient = mock(NodeClient.class); + String localClusterName = "localCluster"; + + Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, localClusterName); + + assertEquals(mockClient, result); + } + + public void testGetClientForClusterRemoteCluster() { + String clusterName = "remoteCluster"; + Client mockClient = mock(NodeClient.class); + // Client mockRemoteClient = mock(Client.class); + + when(mockClient.getRemoteClusterClient(clusterName)).thenReturn(mockClient); + + Client result = CrossClusterConfigUtils.getClientForCluster(clusterName, mockClient, "localCluster"); + + assertEquals(mockClient, result); + } + + public void testSeparateClusterIndexesRemoteCluster() { + List indexes = Arrays.asList("remoteCluster:index1", "index2"); + ClusterService mockClusterService = mock(ClusterService.class); + when(mockClusterService.getClusterName()).thenReturn(new ClusterName("localCluster")); + + HashMap> result = CrossClusterConfigUtils.separateClusterIndexes(indexes, mockClusterService); + + assertEquals(2, result.size()); + assertEquals(Arrays.asList("index1"), result.get("remoteCluster")); + assertEquals(Arrays.asList("index2"), result.get("localCluster")); + } + + public void testParseIndexName() { + assertEquals("index1", CrossClusterConfigUtils.parseIndexName("remoteCluster:index1")); + assertEquals("index2", CrossClusterConfigUtils.parseIndexName("index2")); + } + + public void testParseClusterName() { + assertEquals("remoteCluster", CrossClusterConfigUtils.parseClusterName("remoteCluster:index1")); + assertEquals("", CrossClusterConfigUtils.parseClusterName("index2")); + } +}