Skip to content

Commit

Permalink
Force create last run context in monitor worflow metadata when workfl…
Browse files Browse the repository at this point in the history
…ow is re-enabled

Signed-off-by: Nishtha Mehrotra <[email protected]>
  • Loading branch information
Nishtha Mehrotra committed Jan 21, 2025
1 parent 03595f8 commit 4f9e27b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,15 @@ object MonitorMetadataService :
monitor: Monitor,
createWithRunContext: Boolean = true,
skipIndex: Boolean = false,
workflowMetadataId: String? = null
workflowMetadataId: String? = null,
forceCreateLastRunContext: Boolean = false
): Pair<MonitorMetadata, Boolean> {
try {
val created = true
val metadata = getMetadata(monitor, workflowMetadataId)
var metadata = getMetadata(monitor, workflowMetadataId)
if (forceCreateLastRunContext) {
metadata = metadata?.copy(lastRunContext = createUpdatedRunContext(monitor))
}
return if (metadata != null) {
metadata to !created
} else {
Expand All @@ -159,6 +163,20 @@ object MonitorMetadataService :
}
}

private suspend fun createUpdatedRunContext(
monitor: Monitor
): Map<String, MutableMap<String, Any>> {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
else emptyMap()
return runContext
}

suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? {
try {
val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,17 @@ class TransportIndexWorkflowAction @Inject constructor(
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
var isWorkflowRestarted = false

if (request.workflow.enabled && !currentWorkflow.enabled) {
isWorkflowRestarted = true
}

val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
workflowMetadataId = workflowMetadata.id,
forceCreateLastRunContext = isWorkflowRestarted
)

if (!created &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
Expand Down Expand Up @@ -6380,4 +6381,106 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}
}

fun `test execute workflow when monitor is disabled and re-enabled`() {
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

val index1 = "index_123"
createIndex(index1, Settings.EMPTY)
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1", fields = listOf())

val docLevelInput = DocLevelMonitorInput(
"description",
listOf(index1),
listOf(q1)
)

val customQueryIndex = "custom_alerts_index"

val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex
)
)

val monitorResponse = createMonitor(monitor)!!

val workflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id)
)
val workflowResponse = upsertWorkflow(workflowRequest)!!
val workflowId = workflowResponse.id
val getWorkflowResponse = getWorkflowById(id = workflowResponse.id)

assertNotNull(getWorkflowResponse)
assertEquals(workflowId, getWorkflowResponse.id)

// Verify that monitor workflow metadata exists
assertNotNull(searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata"))

val testDoc1 = """{
"properties": "abcd"
}"""
indexDoc(index1, "1", testDoc1)
indexDoc(index1, "2", testDoc1)
indexDoc(index1, "3", testDoc1)

// Run workflow
executeWorkflow(workflowRequest, workflowId, false)
var findings: List<Finding>
OpenSearchTestCase.waitUntil({
findings = searchFindings(monitorResponse.id)
if (findings.size >= 1) {
return@waitUntil true
}
return@waitUntil false
}, 30, TimeUnit.SECONDS)

// Verify that monitor workflow metadata is updated with lastRunContext
var monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
val lastRunContextBeforeDisable = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
assertEquals(2, lastRunContextBeforeDisable?.get("0"))

// Disable workflow
val disabledWorkflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
id = workflowId,
enabled = false
)
upsertWorkflow(disabledWorkflowRequest)
OpenSearchTestCase.waitUntil({
val workflowResponse = getWorkflowById(workflowId)
if (workflowResponse.workflow?.enabled == false) {
return@waitUntil true
}
return@waitUntil false
}, 30, TimeUnit.SECONDS)

// Index doc, since workflow is disabled, monitor workflow metadata shouldn't be updated
indexDoc(index1, "4", testDoc1)

// re-enable workflow
val enabledWorkflowRequest = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
id = workflowId,
enabled = true
)
upsertWorkflow(enabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)
OpenSearchTestCase.waitUntil({
val workflowResponse = getWorkflowById(workflowId)
if (workflowResponse.workflow?.enabled == true) {
return@waitUntil true
}
return@waitUntil false
}, 30, TimeUnit.SECONDS)

// Verify that monitor workflow metadata exists
// Since workflow is re-enabled, last run context should be updated with latest sequence number
monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
assertNotNull(monitorWokflowMetadata)
val lastRunContext = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
assertEquals(3, lastRunContext?.get("0"))
}
}

0 comments on commit 4f9e27b

Please sign in to comment.