Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move get monitor and search monitor action / request / responses to common-utils #566

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.commons.alerting

import org.opensearch.action.search.SearchResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
Expand All @@ -17,6 +18,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest
import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
Expand All @@ -26,6 +29,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SearchMonitorRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject
Expand Down Expand Up @@ -288,6 +292,51 @@ object AlertingPluginInterface {
)
}

/**
* Get Monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun getMonitor(
client: NodeClient,
request: GetMonitorRequest,
listener: ActionListener<GetMonitorResponse>
) {
client.execute(
AlertingActions.GET_MONITOR_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
GetMonitorResponse(
it
)
}
}
)
}

/**
* Search Monitors interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun searchMonitors(
client: NodeClient,
request: SearchMonitorRequest,
listener: ActionListener<SearchResponse>
) {
client.execute(
AlertingActions.SEARCH_MONITORS_ACTION_TYPE,
request,
// we do not use the wrapActionListener in this case since there is no need
// to recreate any object or specially handle onResponse / onFailure. It is
// simply returning a SearchResponse.
listener
)
}

@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionType
import org.opensearch.action.search.SearchResponse

object AlertingActions {
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
Expand All @@ -18,6 +19,8 @@ object AlertingActions {
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get"
const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
Expand Down Expand Up @@ -60,4 +63,12 @@ object AlertingActions {
@JvmField
val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE =
ActionType(ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)

@JvmField
val GET_MONITOR_ACTION_TYPE =
ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse)

@JvmField
val SEARCH_MONITORS_ACTION_TYPE =
ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.rest.RestRequest
import org.opensearch.search.fetch.subphase.FetchSourceContext
import java.io.IOException

class GetMonitorRequest : ActionRequest {
val monitorId: String
val version: Long
val method: RestRequest.Method
val srcContext: FetchSourceContext?

constructor(
monitorId: String,
version: Long,
method: RestRequest.Method,
srcContext: FetchSourceContext?
) : super() {
this.monitorId = monitorId
this.version = version
this.method = method
this.srcContext = srcContext
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // monitorId
sin.readLong(), // version
sin.readEnum(RestRequest.Method::class.java), // method
if (sin.readBoolean()) {
FetchSourceContext(sin) // srcContext
} else null
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
out.writeLong(version)
out.writeEnum(method)
out.writeBoolean(srcContext != null)
srcContext?.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class GetMonitorResponse : BaseResponse {
var id: String
var version: Long
var seqNo: Long
var primaryTerm: Long
var monitor: Monitor?
var associatedWorkflows: List<AssociatedWorkflow>?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
monitor: Monitor?,
associatedCompositeMonitors: List<AssociatedWorkflow>?,
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.monitor = monitor
this.associatedWorkflows = associatedCompositeMonitors ?: emptyList()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(), // id
version = sin.readLong(), // version
seqNo = sin.readLong(), // seqNo
primaryTerm = sin.readLong(), // primaryTerm
monitor = if (sin.readBoolean()) {
Monitor.readFrom(sin) // monitor
} else null,
associatedCompositeMonitors = sin.readList((AssociatedWorkflow)::readFrom),
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
if (monitor != null) {
out.writeBoolean(true)
monitor?.writeTo(out)
} else {
out.writeBoolean(false)
}
associatedWorkflows?.forEach {
it.writeTo(out)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (monitor != null) {
builder.field("monitor", monitor)
}
if (associatedWorkflows != null) {
builder.field("associated_workflows", associatedWorkflows!!.toTypedArray())
}
return builder.endObject()
}

class AssociatedWorkflow : ToXContentFragment {
val id: String
val name: String

constructor(id: String, name: String) {
this.id = id
this.name = name
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
builder.startObject()
.field("id", id)
.field("name", name)
.endObject()
return builder
}

fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString()
)

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): AssociatedWorkflow {
return AssociatedWorkflow(sin)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.search.SearchRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class SearchMonitorRequest : ActionRequest {

val searchRequest: SearchRequest

constructor(
searchRequest: SearchRequest
) : super() {
this.searchRequest = searchRequest
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
searchRequest = SearchRequest(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
searchRequest.writeTo(out)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.mockito.Mockito
import org.mockito.Mockito.mock
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionType
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
Expand All @@ -23,6 +24,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest
import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
Expand All @@ -32,6 +35,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SearchMonitorRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.FindingDocument
import org.opensearch.commons.alerting.model.FindingWithDocs
Expand Down Expand Up @@ -269,4 +273,32 @@ internal class AlertingPluginInterfaceTests {
AlertingPluginInterface.acknowledgeChainedAlerts(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun getMonitor() {
val request = mock(GetMonitorRequest::class.java)
val response = GetMonitorResponse("test-id", 1, 1, 1, null, null)
val listener: ActionListener<GetMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<GetMonitorResponse>
Mockito.doAnswer {
(it.getArgument(2) as ActionListener<GetMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.getMonitor(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun searchMonitors() {
val request = mock(SearchMonitorRequest::class.java)
val response = mock(SearchResponse::class.java)
val listener: ActionListener<SearchResponse> =
mock(ActionListener::class.java) as ActionListener<SearchResponse>
Mockito.doAnswer {
(it.getArgument(2) as ActionListener<SearchResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.searchMonitors(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}
}
Loading