Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollup QSQ follow-up #800

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices
Expand Down Expand Up @@ -123,6 +123,7 @@ import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.QueryShardContextFactory
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestExplainSMPolicyHandler
Expand All @@ -138,7 +139,6 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.Trans
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.start.TransportStartSMAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.stop.TransportStopSMAction
import org.opensearch.indexmanagement.snapshotmanagement.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings
Expand Down Expand Up @@ -382,7 +382,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
namedWriteableRegistry,
environment
)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
rollupInterceptor = RollupInterceptor(clusterService, client, settings, indexNameExpressionResolver)
val jvmService = JvmService(environment.settings())
val transformRunner = TransformRunner.initialize(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.indexmanagement.rollup.action.index

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
Expand All @@ -15,26 +20,27 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.parseRollup
import org.opensearch.indexmanagement.rollup.util.populateSourceIndexFieldMappings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.SecurityUtils
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand All @@ -46,12 +52,14 @@ class TransportIndexRollupAction @Inject constructor(
val client: Client,
actionFilters: ActionFilters,
val indexManagementIndices: IndexManagementIndices,
val indexNameExpressionResolver: IndexNameExpressionResolver,
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<IndexRollupRequest, IndexRollupResponse>(
IndexRollupAction.NAME, transportService, actionFilters, ::IndexRollupRequest
) {
),
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TransportIndexRollupAction")) {

@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

Expand Down Expand Up @@ -80,72 +88,78 @@ class TransportIndexRollupAction @Inject constructor(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
)}"
)
client.threadPool().threadContext.stashContext().use {
if (!validateUserConfiguration(user, filterByEnabled, actionListener)) {
return
}
indexManagementIndices.checkAndUpdateIMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure))
}
}

private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
if (request.opType() == DocWriteRequest.OpType.CREATE) {
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
launch {
try {
indexManagementIndices.checkAndUpdateIMConfigIndex(log)
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
if (request.opType() == DocWriteRequest.OpType.CREATE) {
if (!validateTargetIndexName()) {
actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
request.rollup = populateSourceIndexFieldMappings(
request.rollup,
indexNameExpressionResolver,
clusterService.state(),
client,
log
)
indexRollupDoc()
} else {
updateRollup()
}
putRollup()
} else {
getRollup()
} catch (e: Exception) {
actionListener.onFailure(e)
}
} else {
val message = "Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping."
log.error(message)
actionListener.onFailure(OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR))
}
}

private fun getRollup() {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.rollup.id)
client.get(getRequest, ActionListener.wrap(::onGetRollup, actionListener::onFailure))
}

@Suppress("ReturnCount")
private fun onGetRollup(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND))
return
}

val rollup: Rollup?
private suspend fun updateRollup() {
try {
rollup = parseRollup(response, xContentRegistry)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND))
return
}
if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) {
return
}
val modified = modifiedImmutableProperties(rollup, request.rollup)
if (modified.isNotEmpty()) {
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
}
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.rollup.id)
val getResponse: GetResponse = client.suspendUntil {
get(getRequest, it)
}

if (!getResponse.isExists) {
actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND))
return
}

val rollup: Rollup?
try {
rollup = parseRollup(getResponse, xContentRegistry)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND))
return
}
if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) {
return
}
val modified = modifiedImmutableProperties(rollup, request.rollup)
if (modified.isNotEmpty()) {
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
}
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
)
}

request.rollup = populateSourceIndexFieldMappings(request.rollup, indexNameExpressionResolver, clusterService.state(), client, log)

indexRollupDoc()
} catch (e: Exception) {
actionListener.onFailure(e)
}
putRollup()
}

private fun modifiedImmutableProperties(rollup: Rollup, newRollup: Rollup): List<String> {
Expand All @@ -158,35 +172,30 @@ class TransportIndexRollupAction @Inject constructor(
return modified.toList()
}

private fun putRollup() {
val rollup = request.rollup.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user)
request.index(INDEX_MANAGEMENT_INDEX)
.id(request.rollup.id)
.source(rollup.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(IndexRequest.DEFAULT_TIMEOUT)
client.index(
request,
object : ActionListener<IndexResponse> {
override fun onResponse(response: IndexResponse) {
if (response.shardInfo.failed > 0) {
val failureReasons = response.shardInfo.failures.joinToString(", ") { it.reason() }
actionListener.onFailure(OpenSearchStatusException(failureReasons, response.status()))
} else {
val status = if (request.opType() == DocWriteRequest.OpType.CREATE) RestStatus.CREATED else RestStatus.OK
actionListener.onResponse(
IndexRollupResponse(
response.id, response.version, response.seqNo, response.primaryTerm, status,
rollup.copy(seqNo = response.seqNo, primaryTerm = response.primaryTerm)
)
)
}
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
private suspend fun indexRollupDoc() {
try {
val rollup = request.rollup.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user)
request.index(INDEX_MANAGEMENT_INDEX)
.id(request.rollup.id)
.source(rollup.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(IndexRequest.DEFAULT_TIMEOUT)

val response: IndexResponse = client.suspendUntil { index(request, it) }
if (response.shardInfo.failed > 0) {
val failureReasons = response.shardInfo.failures.joinToString(", ") { it.reason() }
actionListener.onFailure(OpenSearchStatusException(failureReasons, response.status()))
} else {
val status = if (request.opType() == DocWriteRequest.OpType.CREATE) RestStatus.CREATED else RestStatus.OK
actionListener.onResponse(
IndexRollupResponse(
response.id, response.version, response.seqNo, response.primaryTerm, status,
rollup.copy(seqNo = response.seqNo, primaryTerm = response.primaryTerm)
)
)
}
)
} catch (e: Exception) {
actionListener.onFailure(e)
}
}

private fun validateTargetIndexName(): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.core.xcontent.MediaType
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.MediaType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.lang.Exception

class TransportUpdateRollupMappingAction @Inject constructor(
threadPool: ThreadPool,
Expand Down Expand Up @@ -101,15 +100,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
metaMappings[_META] = updatedRollups
} else {
if ((rollups as Map<*, *>).containsKey(request.rollup.id)) {
log.debug("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
return listener.onFailure(
IllegalStateException("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]")
)
}

// In this case rollup mappings exists and there is no entry for request.rollup.id
val rollupJobEntries = rollups.toMutableMap()
val rollupJobEntries = (rollups as Map<*, *>).toMutableMap()
rollupJobEntries[request.rollup.id] = rollup
val updatedRollups = mapOf<String, Any>("rollups" to rollupJobEntries)
metaMappings[_META] = updatedRollups
Expand Down
Loading