Skip to content

Commit

Permalink
Add execute streaming workflow changes
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
  • Loading branch information
engechas committed Feb 12, 2024
1 parent 00147e4 commit 113ffc8
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.action.ExecuteStreamingWorkflowRequest
import org.opensearch.commons.alerting.action.ExecuteStreamingWorkflowResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
Expand Down Expand Up @@ -337,6 +339,30 @@ object AlertingPluginInterface {
)
}

/**
* Execute streaming Workflow interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun executeStreamingWorkflow(
client: NodeClient,
request: ExecuteStreamingWorkflowRequest,
listener: ActionListener<ExecuteStreamingWorkflowResponse>
) {
client.execute(
AlertingActions.EXECUTE_STREAMING_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
ExecuteStreamingWorkflowResponse(
it
)
}
}
)
}

@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object AlertingActions {
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get"
const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search"
const val EXECUTE_STREAMING_WORKFLOW_ACTION_NAME = "cluster:admin/opendistro/alerting/workflow/streaming/execute"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
Expand Down Expand Up @@ -71,4 +72,8 @@ object AlertingActions {
@JvmField
val SEARCH_MONITORS_ACTION_TYPE =
ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse)

@JvmField
val EXECUTE_STREAMING_WORKFLOW_ACTION_TYPE =
ActionType(EXECUTE_STREAMING_WORKFLOW_ACTION_NAME, ::ExecuteStreamingWorkflowResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.StreamingIndex
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException

class ExecuteStreamingWorkflowRequest : ActionRequest {
var workflowId: String
var indices: List<StreamingIndex>

constructor(workflowId: String, indices: List<StreamingIndex>) : super() {
this.workflowId = workflowId
this.indices = indices
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
workflowId = sin.readString(),
indices = sin.readList(::StreamingIndex)
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeCollection(indices)
}

companion object {
const val WORKFLOW_ID_FIELD = "workflowId"
const val INDICES_FIELD = "indices"

@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser): ExecuteStreamingWorkflowRequest {
var workflowId: String? = null
var indices: MutableList<StreamingIndex> = mutableListOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
WORKFLOW_ID_FIELD -> workflowId = xcp.text()
INDICES_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
val index = StreamingIndex.parse(xcp)
indices.add(index)
}
}

else -> {
xcp.skipChildren()
}
}
}

return ExecuteStreamingWorkflowRequest(
requireNotNull(workflowId) { "workflowId is null" },
indices
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.alerting.action

import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

// TODO - return more info so that it can be passed back to the client if there are failures?
class ExecuteStreamingWorkflowResponse : BaseResponse {
private var status: RestStatus

constructor(status: RestStatus) : super() {
this.status = status
}

@Throws(IOException::class)
constructor(sin: StreamInput) {
this.status = sin.readEnum(RestStatus::class.java)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeEnum(status)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("status", status.status)
return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class GetMonitorResponse : BaseResponse {
monitor = if (sin.readBoolean()) {
Monitor.readFrom(sin) // monitor
} else null,
associatedCompositeMonitors = sin.readList((AssociatedWorkflow)::readFrom),
// TODO - find correct solution for this instead of setting to null
associatedCompositeMonitors = null
)

@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ data class DocLevelQuery(
val id: String = UUID.randomUUID().toString(),
val name: String,
val query: String,
val tags: List<String> = mutableListOf()
val tags: List<String> = mutableListOf(),
val queryFieldNames: List<String> = mutableListOf(),
) : BaseModel {

init {
Expand All @@ -31,15 +32,17 @@ data class DocLevelQuery(
sin.readString(), // id
sin.readString(), // name
sin.readString(), // query
sin.readStringList() // tags
sin.readStringList(), // tags
sin.readStringList() // fieldsBeingQueried
)

fun asTemplateArg(): Map<String, Any> {
return mapOf(
QUERY_ID_FIELD to id,
NAME_FIELD to name,
QUERY_FIELD to query,
TAGS_FIELD to tags
TAGS_FIELD to tags,
QUERY_FIELD_NAMES_FIELD to queryFieldNames
)
}

Expand All @@ -49,6 +52,7 @@ data class DocLevelQuery(
out.writeString(name)
out.writeString(query)
out.writeStringCollection(tags)
out.writeStringCollection(queryFieldNames)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -57,6 +61,7 @@ data class DocLevelQuery(
.field(NAME_FIELD, name)
.field(QUERY_FIELD, query)
.field(TAGS_FIELD, tags.toTypedArray())
.field(QUERY_FIELD_NAMES_FIELD, queryFieldNames.toTypedArray())
.endObject()
return builder
}
Expand All @@ -66,6 +71,7 @@ data class DocLevelQuery(
const val NAME_FIELD = "name"
const val QUERY_FIELD = "query"
const val TAGS_FIELD = "tags"
const val QUERY_FIELD_NAMES_FIELD = "query_field_names"
const val NO_ID = ""
val INVALID_CHARACTERS: List<String> = listOf(" ", "[", "]", "{", "}", "(", ")")

Expand All @@ -76,6 +82,7 @@ data class DocLevelQuery(
lateinit var query: String
lateinit var name: String
val tags: MutableList<String> = mutableListOf()
val queryFieldNames: MutableList<String> = mutableListOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -101,14 +108,26 @@ data class DocLevelQuery(
tags.add(tag)
}
}
QUERY_FIELD_NAMES_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
val field = xcp.text()
queryFieldNames.add(field)
}
}
}
}

return DocLevelQuery(
id = id,
name = name,
query = query,
tags = tags
tags = tags,
queryFieldNames = queryFieldNames
)
}

Expand All @@ -129,4 +148,18 @@ data class DocLevelQuery(
}
}
}

// constructor for java plugins' convenience to optionally avoid passing empty list for 'fieldsBeingQueried' field
constructor(
id: String,
name: String,
query: String,
tags: MutableList<String>,
) : this(
id = id,
name = name,
query = query,
tags = tags,
queryFieldNames = emptyList()
)
}
87 changes: 87 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/IdDocPair.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.alerting.model

import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException

class IdDocPair : ToXContentObject, Writeable {
var docId: String
var document: BytesReference

constructor(docId: String, document: BytesReference) : super() {
this.docId = docId
this.document = document
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
docId = sin.readString(),
document = sin.readBytesReference()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(docId)
out.writeBytesReference(document)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("docId", docId)
.field("document", document)
return builder.endObject()
}

@Throws(IOException::class)
fun readFrom(sin: StreamInput): IdDocPair {
return IdDocPair(sin)
}

companion object {
const val DOC_ID_FIELD = "docId"
const val DOCUMENT_FIELD = "document"

@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser): IdDocPair {
var docId: String? = null
var document: BytesReference? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
DOC_ID_FIELD -> docId = xcp.text()
DOCUMENT_FIELD -> {
val xContentBuilder = XContentBuilder.builder(xcp.contentType().xContent())
xContentBuilder.copyCurrentStructure(xcp)
document = BytesReference.bytes(xContentBuilder)
}

else -> {
xcp.skipChildren()
}
}
}

return IdDocPair(
requireNotNull(docId) { "docId is null" },
requireNotNull(document) { "document is null" }
)
}
}
}
Loading

0 comments on commit 113ffc8

Please sign in to comment.