Skip to content

Commit

Permalink
optimize doc-level monitor execution workflow for datastreams (opense…
Browse files Browse the repository at this point in the history
…arch-project#1302)

* optimize doc-level monitor execution for datastreams

Signed-off-by: Subhobrata Dey <[email protected]>

* add more tests to address comments

Signed-off-by: Subhobrata Dey <[email protected]>

* add integTest for multiple datastreams inside a single index pattern

* add integTest for multiple datastreams inside a single index pattern

Signed-off-by: Subhobrata Dey <[email protected]>

---------

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored and goyamegh committed Mar 14, 2024
1 parent c076bfa commit c4c48db
Show file tree
Hide file tree
Showing 6 changed files with 623 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
Expand All @@ -56,7 +57,6 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val concreteIndices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}
Expand All @@ -139,8 +139,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!concreteIndices.contains(ind)) {
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}
Expand All @@ -149,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
Expand All @@ -178,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down Expand Up @@ -643,6 +668,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -675,7 +701,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -206,11 +207,19 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)

val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

// Run through each backing index and apply appropriate mappings to query index
indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -151,5 +152,47 @@ class IndexUtils {

return result
}

@JvmStatic
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().dataStreams().containsKey(name)
}

@JvmStatic
fun isAlias(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().hasAlias(name)
}

@JvmStatic
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
if (metadata != null) {
return metadata.index.name
}
}
return null
}

@JvmStatic
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
val filteredIndices = mutableListOf<String>()
val lookup = clusterState.metadata().indicesLookup
concreteIndices.forEach { indexName ->
val index = lookup[indexName]
val indexMetadata = clusterState.metadata.index(indexName)
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
if (indexMetadata.creationDate >= thresholdDate) {
filteredIndices.add(indexName)
}
}
}
return filteredIndices
}

@JvmStatic
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
return clusterState.metadata.index(index).creationDate
}
}
}
Loading

0 comments on commit c4c48db

Please sign in to comment.