diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 9c79fb9e2..1dab11e64 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -527,6 +527,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_DASHBOARDS, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index c65584afe..ba72b6426 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -64,6 +64,8 @@ class RollupInterceptor( @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) + @Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) { searchEnabled = it @@ -71,6 +73,9 @@ class RollupInterceptor( clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) { searchAllJobs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) { + searchRawRollupIndices = it + } } @Suppress("SpreadOperator") @@ -143,13 +148,15 @@ class RollupInterceptor( var allMatchingRollupJobs: Map> = mapOf() for (concreteIndex in concreteIndices) { val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() - ?: throw IllegalArgumentException("Not all indices have rollup job") - - val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) - if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { - throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + if (rollupJobs != null) { + val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) + if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { + throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + } + allMatchingRollupJobs += matchingRollupJobs + } else if (!searchRawRollupIndices) { + throw IllegalArgumentException("Not all indices have rollup job") } - allMatchingRollupJobs += matchingRollupJobs } return allMatchingRollupJobs } @@ -342,6 +349,9 @@ class RollupInterceptor( if (searchAllJobs) { request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) } else { + if (matchingRollupJobs.keys.size > 1) { + logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window") + } request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index bc7228853..1d11a36db 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -14,6 +14,7 @@ class RollupSettings { companion object { const val DEFAULT_ROLLUP_ENABLED = true const val DEFAULT_SEARCH_ALL_JOBS = false + const val DEFAULT_SEARCH_SOURCE_INDICES = false const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3 const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 @@ -78,11 +79,20 @@ class RollupSettings { Setting.Property.Dynamic, ) - val ROLLUP_DASHBOARDS: Setting = Setting.boolSetting( - "plugins.rollup.dashboards.enabled", - LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS, - Setting.Property.NodeScope, - Setting.Property.Dynamic, - ) + val ROLLUP_SEARCH_SOURCE_INDICES: Setting = + Setting.boolSetting( + "plugins.rollup.search.search_source_indices", + DEFAULT_SEARCH_SOURCE_INDICES, + Setting.Property.NodeScope, + Setting.Property.Dynamic, + ) + + val ROLLUP_DASHBOARDS: Setting = + Setting.boolSetting( + "plugins.rollup.dashboards.enabled", + LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS, + Setting.Property.NodeScope, + Setting.Property.Dynamic, + ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 7988ee2e2..97c47e0c5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -94,6 +94,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { RollupSettings.ROLLUP_ENABLED, RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, RollupSettings.ROLLUP_DASHBOARDS, SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES, ), @@ -177,6 +178,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false) + assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1) assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 117ec111f..57dc65904 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -263,6 +263,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { assertEquals("Request failed", RestStatus.OK, res.restStatus()) } + protected fun updateSearchRawRollupClusterSetting(value: Boolean) { + val formattedValue = "\"${value}\"" + val request = + """ + { + "persistent": { + "${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue + } + } + """.trimIndent() + val res = + client().makeRequest( + "PUT", "_cluster/settings", emptyMap(), + StringEntity(request, APPLICATION_JSON), + ) + assertEquals("Request failed", RestStatus.OK, res.restStatus()) + } + protected fun createSampleIndexForQSQTest(index: String) { val mapping = """ "properties": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 305fdfa2d..7d74bbf42 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1040,12 +1040,12 @@ class RollupInterceptorIT : RollupRestTestCase() { }, "aggs": { "sum_passenger_count": { "sum": { "field": "passenger_count" } }, - "max_passenger_count": { "max": { "field": "passenger_count" } }, - "value_count_passenger_count": { "value_count": { "field": "passenger_count" } } + "max_passenger_count": { "max": { "field": "passenger_count" } } } } """.trimIndent() - // Search 1 non-rollup index and 1 rollup +// Search 1 non-rollup index and 1 rollup + updateSearchRawRollupClusterSetting(false) val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) assertTrue(searchResult1.restStatus() == RestStatus.OK) val failures = extractFailuresFromSearchResponse(searchResult1) @@ -1061,6 +1061,29 @@ class RollupInterceptorIT : RollupRestTestCase() { "Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response", ) + // Updating to allow searching on non-rollup and rolled-up index together + updateSearchRawRollupClusterSetting(true) + val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes1.restStatus() == RestStatus.OK) + val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes2.restStatus() == RestStatus.OK) + val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(searchResult.restStatus() == RestStatus.OK) + val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> + val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> + val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map> + + val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double + + assertEquals( + "Searching single raw source index and rollup target index did not return the same sum results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"], + ) + assertEquals( + "Searching rollup target index did not return the sum for all of the rollup jobs on the index", + trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"], + ) + // Search 2 rollups with different mappings try { client().makeRequest(