Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle exceptions gracefully when delete non-existent resources during integ test resource clean up #1154

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import java.nio.file.Path;
import java.util.Locale;
import java.util.Optional;

import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Before;
import org.opensearch.common.settings.Settings;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;
Expand All @@ -20,11 +23,30 @@

public abstract class AbstractRestartUpgradeRestTestCase extends BaseNeuralSearchIT {

// Resources to be cleaned up after each test, need to assign the actual values in the test itself
protected String modelId;
protected String ingestPipelineName;
protected String searchPipelineName;
protected String indexName;

@Before
protected String getIndexNameForTest() {
public void initialize() {
// Initialize variables
this.modelId = null;
this.ingestPipelineName = null;
this.searchPipelineName = null;

// Creating index name by concatenating "neural-bwc-" prefix with test method name
// for all the tests in this sub-project
return NEURAL_SEARCH_BWC_PREFIX + getTestName().toLowerCase(Locale.ROOT);
this.indexName = NEURAL_SEARCH_BWC_PREFIX + getTestName().toLowerCase(Locale.ROOT);
}

@SneakyThrows
@After
public void cleanUpResources() {
if (!isRunningAgainstOldCluster()) {
wipeOfTestResources(this.indexName, this.ingestPipelineName, this.modelId, this.searchPipelineName);
}
}

@Override
Expand Down Expand Up @@ -53,6 +75,16 @@ protected final Settings restClientSettings() {
.build();
}

@Override
protected boolean shouldCleanUpResources() {
// All NEW CLUSTER tests depend on resources created in OLD CLUSTER test cases
// Before NEW CLUSTER tests run, all OLD CLUSTER test cases will be run first
// We only want to clean up resources in NEW CLUSTER tests, also we don't want to clean up after each test case finishes
// this is because the cleanup method will pull every resource and delete, which will impact other tests
// Overriding the method in base class so that resources won't be accidentally clean up
return false;
}

protected static final boolean isRunningAgainstOldCluster() {
return Boolean.parseBoolean(System.getProperty(RESTART_UPGRADE_OLD_CLUSTER));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,26 @@ public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase {

public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
String indexName = getIndexNameForTest();
super.ingestPipelineName = PIPELINE_NAME;

if (isRunningAgainstOldCluster()) {
String modelId = uploadSparseEncodingModel();
loadModel(modelId);
createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME, batchSize);
super.modelId = uploadSparseEncodingModel();
loadModel(super.modelId);
createPipelineForSparseEncodingProcessor(super.modelId, PIPELINE_NAME, batchSize);
createIndexWithConfiguration(
indexName,
super.indexName,
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
PIPELINE_NAME
);
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs);
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
bulkAddDocuments(super.indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs);
validateDocCountAndInfo(super.indexName, 5, () -> getDocById(super.indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
} else {
String modelId = null;
modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR);
loadModel(modelId);
try {
List<Map<String, String>> docs = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs);
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null);
}
super.modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR);
loadModel(super.modelId);
List<Map<String, String>> docs = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(super.indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs);
validateDocCountAndInfo(super.indexName, 10, () -> getDocById(super.indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,28 @@ public void testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow() throws
private void validateNormalizationProcessor(final String fileName, final String pipelineName, final String searchPipelineName)
throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
super.ingestPipelineName = pipelineName;
super.searchPipelineName = searchPipelineName;

if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, pipelineName);
super.modelId = uploadTextEmbeddingModel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not making any changes to the BWC tests, as this approach may not work. For instance, there are two tests: testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow and testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow. Each test uploads a model, but the model ID from the previous test is overwritten by the next, resulting in a failure to delete the all resources.

loadModel(super.modelId);
createPipelineProcessor(super.modelId, pipelineName);
createIndexWithConfiguration(
getIndexNameForTest(),
super.indexName,
Files.readString(Path.of(classLoader.getResource(fileName).toURI())),
pipelineName
);
addDocuments(getIndexNameForTest(), true);
addDocuments(super.indexName, true);
createSearchPipeline(searchPipelineName);
} else {
String modelId = null;
try {
modelId = getModelId(getIngestionPipeline(pipelineName), TEXT_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocuments(getIndexNameForTest(), false);
HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId, null, null, null);
validateTestIndex(getIndexNameForTest(), searchPipelineName, hybridQueryBuilder);
hybridQueryBuilder = getQueryBuilder(modelId, Boolean.FALSE, Map.of("ef_search", 100), RescoreContext.getDefault());
validateTestIndex(getIndexNameForTest(), searchPipelineName, hybridQueryBuilder);
} finally {
wipeOfTestResources(getIndexNameForTest(), pipelineName, modelId, searchPipelineName);
}
super.modelId = getModelId(getIngestionPipeline(pipelineName), TEXT_EMBEDDING_PROCESSOR);
loadModel(super.modelId);
addDocuments(super.indexName, false);
HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(super.modelId, null, null, null);
validateTestIndex(super.indexName, searchPipelineName, hybridQueryBuilder);
hybridQueryBuilder = getQueryBuilder(super.modelId, Boolean.FALSE, Map.of("ef_search", 100), RescoreContext.getDefault());
validateTestIndex(super.indexName, searchPipelineName, hybridQueryBuilder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,37 +41,33 @@ public class HybridSearchWithRescoreIT extends AbstractRestartUpgradeRestTestCas
*/
public void testHybridQueryWithRescore_whenIndexWithMultipleShards_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
super.ingestPipelineName = PIPELINE_NAME;

if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
super.modelId = uploadTextEmbeddingModel();
loadModel(super.modelId);
createPipelineProcessor(super.modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
super.indexName,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
addDocument(super.indexName, "0", TEST_FIELD, TEXT, null, null);
createSearchPipeline(
SEARCH_PIPELINE_NAME,
DEFAULT_NORMALIZATION_METHOD,
DEFAULT_COMBINATION_METHOD,
Map.of(PARAM_NAME_WEIGHTS, Arrays.toString(new float[] { 0.3f, 0.7f }))
);
} else {
String modelId = null;
try {
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_UPGRADED, null, null);
HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId, null, null);
QueryBuilder rescorer = QueryBuilders.matchQuery(TEST_FIELD, RESCORE_QUERY).boost(0.3f);
validateTestIndex(getIndexNameForTest(), hybridQueryBuilder, rescorer);
hybridQueryBuilder = getQueryBuilder(modelId, Map.of("ef_search", 100), RescoreContext.getDefault());
validateTestIndex(getIndexNameForTest(), hybridQueryBuilder, rescorer);
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
super.modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
loadModel(super.modelId);
addDocument(super.indexName, "1", TEST_FIELD, TEXT_UPGRADED, null, null);
HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(super.modelId, null, null);
QueryBuilder rescorer = QueryBuilders.matchQuery(TEST_FIELD, RESCORE_QUERY).boost(0.3f);
validateTestIndex(super.indexName, hybridQueryBuilder, rescorer);
hybridQueryBuilder = getQueryBuilder(super.modelId, Map.of("ef_search", 100), RescoreContext.getDefault());
validateTestIndex(super.indexName, hybridQueryBuilder, rescorer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,24 @@ public class KnnRadialSearchIT extends AbstractRestartUpgradeRestTestCase {
// Validate radial query, pipeline and document count in restart-upgrade scenario
public void testKnnRadialSearch_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
super.ingestPipelineName = PIPELINE_NAME;

if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineForTextImageProcessor(modelId, PIPELINE_NAME);
super.modelId = uploadTextEmbeddingModel();
loadModel(super.modelId);
createPipelineForTextImageProcessor(super.modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
super.indexName,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT);
addDocument(super.indexName, "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT);
} else {
String modelId = null;
try {
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_1);
validateIndexQuery(modelId);
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
super.modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);

loadModel(super.modelId);
addDocument(super.indexName, "1", TEST_FIELD, TEXT_1, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_1);
validateIndexQuery(super.modelId);
}
}

Expand All @@ -59,7 +56,7 @@ private void validateIndexQuery(final String modelId) {
.minScore(0.01f)
.build();

Map<String, Object> responseWithMinScoreQuery = search(getIndexNameForTest(), neuralQueryBuilderWithMinScoreQuery, 1);
Map<String, Object> responseWithMinScoreQuery = search(super.indexName, neuralQueryBuilderWithMinScoreQuery, 1);
assertNotNull(responseWithMinScoreQuery);

NeuralQueryBuilder neuralQueryBuilderWithMaxDistanceQuery = NeuralQueryBuilder.builder()
Expand All @@ -69,7 +66,7 @@ private void validateIndexQuery(final String modelId) {
.modelId(modelId)
.maxDistance(100000f)
.build();
Map<String, Object> responseWithMaxDistanceQuery = search(getIndexNameForTest(), neuralQueryBuilderWithMaxDistanceQuery, 1);
Map<String, Object> responseWithMaxDistanceQuery = search(super.indexName, neuralQueryBuilderWithMaxDistanceQuery, 1);
assertNotNull(responseWithMaxDistanceQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.TEXT_IMAGE_EMBEDDING_PROCESSOR;
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;

import org.opensearch.neuralsearch.query.NeuralQueryBuilder;

public class MultiModalSearchIT extends AbstractRestartUpgradeRestTestCase {
Expand All @@ -26,32 +27,28 @@ public class MultiModalSearchIT extends AbstractRestartUpgradeRestTestCase {
// Validate process , pipeline and document count in restart-upgrade scenario
public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
super.ingestPipelineName = PIPELINE_NAME;

if (isRunningAgainstOldCluster()) {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineForTextImageProcessor(modelId, PIPELINE_NAME);
super.modelId = uploadTextEmbeddingModel();
loadModel(super.modelId);
createPipelineForTextImageProcessor(super.modelId, PIPELINE_NAME);
createIndexWithConfiguration(
getIndexNameForTest(),
super.indexName,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())),
PIPELINE_NAME
);
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT);
addDocument(super.indexName, "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT);
} else {
String modelId = null;
try {
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
loadModel(modelId);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_1);
validateTestIndex(modelId);
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
super.modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
loadModel(super.modelId);
addDocument(super.indexName, "1", TEST_FIELD, TEXT_1, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_1);
validateTestIndex(super.modelId);
}
}

private void validateTestIndex(final String modelId) throws Exception {
int docCount = getDocCount(getIndexNameForTest());
int docCount = getDocCount(super.indexName);
assertEquals(2, docCount);
NeuralQueryBuilder neuralQueryBuilder = NeuralQueryBuilder.builder()
.fieldName("passage_embedding")
Expand All @@ -60,7 +57,7 @@ private void validateTestIndex(final String modelId) throws Exception {
.modelId(modelId)
.k(1)
.build();
Map<String, Object> response = search(getIndexNameForTest(), neuralQueryBuilder, 1);
Map<String, Object> response = search(super.indexName, neuralQueryBuilder, 1);
assertNotNull(response);
}

Expand Down
Loading
Loading