Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alias index transform #1049

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e5631ac
adding write index check for alias of target index
n-dohrmann Nov 21, 2023
7fbde10
trying out putMapping changes
n-dohrmann Nov 29, 2023
e0274ec
linting previous change
n-dohrmann Nov 29, 2023
f866378
variable target index PR ready for discussion...
n-dohrmann Nov 29, 2023
4abb60f
Merge branch 'main' into alias_index_transform
n-dohrmann Nov 29, 2023
e687f5f
linting previous commit
n-dohrmann Nov 29, 2023
f70faab
Merge branch 'alias_index_transform' of github.com:n-dohrmann/index-m…
n-dohrmann Nov 29, 2023
a3586f5
reduce throw count to < 2 in createTargetIndex
n-dohrmann Nov 29, 2023
024696c
adding write index check for alias of target index
n-dohrmann Nov 21, 2023
3e43d36
trying out putMapping changes
n-dohrmann Nov 29, 2023
123378b
linting previous change
n-dohrmann Nov 29, 2023
ef70a74
variable target index PR ready for discussion...
n-dohrmann Nov 29, 2023
3be1246
linting previous commit
n-dohrmann Nov 29, 2023
4138ff0
reduce throw count to < 2 in createTargetIndex
n-dohrmann Nov 29, 2023
7db746d
Merge branch 'alias_index_transform' of github.com:n-dohrmann/index-m…
n-dohrmann Nov 30, 2023
01bd3a2
changing alias checker control flow
n-dohrmann Dec 5, 2023
ab17632
adding test case for aliased transform target index
n-dohrmann Dec 6, 2023
448161b
quick commit before changing branches
n-dohrmann Dec 14, 2023
0b02a23
Merge branch 'main' into alias_index_transform
n-dohrmann Dec 14, 2023
e50468b
adding code for quick question
n-dohrmann Dec 15, 2023
7dd3530
adding to target alias transform test
n-dohrmann Dec 15, 2023
ce65e6c
Merge branch 'main' into alias_index_transform
n-dohrmann Dec 15, 2023
a738944
adding explicit variable for sourceIndex
n-dohrmann Dec 15, 2023
0bb9ce8
adding unchecked cast suppressor to test method
n-dohrmann Dec 15, 2023
7cf342a
Merge branch 'main' into alias_index_transform
bowenlan-amzn Jan 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.bulk.BulkItemResponse
import org.opensearch.action.bulk.BulkRequest
Expand All @@ -26,6 +27,7 @@
import org.opensearch.indexmanagement.transform.util.TransformContext
import org.opensearch.core.rest.RestStatus
import org.opensearch.transport.RemoteTransportException
import org.opensearch.action.support.master.AcknowledgedResponse

@Suppress("ComplexMethod")
class TransformIndexer(
Expand Down Expand Up @@ -63,6 +65,20 @@
throw TransformIndexException("Failed to create the target index")
}
}
if (clusterService.state().metadata.hasAlias(targetIndex)) {
// return error if no write index with the alias
val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex

Check warning on line 70 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt#L70

Added line #L70 was not covered by tests
if (writeIndexMetadata == null) {
throw TransformIndexException("target_index [$targetIndex] is an alias but doesn't have write index")

Check warning on line 72 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt#L72

Added line #L72 was not covered by tests
}
val putMappingReq = PutMappingRequest(writeIndexMetadata.index?.name).source(targetFieldMappings)
val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil {
putMapping(putMappingReq)

Check warning on line 76 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt#L75-L76

Added lines #L75 - L76 were not covered by tests
}
if (!mapResp.isAcknowledged) {
logger.error("Target index mapping request failed")

Check warning on line 79 in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt#L79

Added line #L79 was not covered by tests
}
}
}

@Suppress("ThrowsCount", "RethrowCaughtException")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestRequest
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.search.aggregations.AggregationBuilders
Expand Down Expand Up @@ -1430,6 +1432,99 @@ class TransformRunnerIT : TransformRestTestCase() {
disableTransform(transform.id)
}

@Suppress("UNCHECKED_CAST")
fun `test transform with wildcard, aliased target index`() {
val sourceIndex = "source-index"
validateSourceIndex(sourceIndex)

// create alias
val indexAlias = "wildcard_index_alias"
val resolvedTargetIndex = "resolved_target_index"
val builtSettings = Settings.builder().let {
it.put(INDEX_NUMBER_OF_REPLICAS, 1)
it.put(INDEX_NUMBER_OF_SHARDS, 1)
it
}.build()
val aliases = "\"$indexAlias\": { \"is_write_index\": true }"
createIndex(resolvedTargetIndex, builtSettings, null, aliases)

refreshAllIndices()
val pickupDateTime = "tpep_pickup_datetime"
val fareAmount = "fare_amount"

val transform = Transform(
id = "id_18",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform",
metadataId = null,
sourceIndex = sourceIndex,
targetIndex = indexAlias,
roles = emptyList(),
pageSize = 100,
groups = listOf(
Terms(sourceField = pickupDateTime, targetField = pickupDateTime)
),
aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount))
).let { createTransform(it, it.id) }

updateTransformStartTime(transform)

waitFor { assertTrue("Target transform index was not created", indexExists(resolvedTargetIndex)) }

waitFor {
val job = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", job.metadataId)
val transformMetadata = getTransformMetadata(job.metadataId!!)
assertEquals("Transform had not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)
}
// TODO - make sure we've written to the correct index!
val sourceIndexMapping = client().makeRequest("GET", "/$sourceIndex/_mapping")
val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val targetIndexMapping = client().makeRequest("GET", "/$indexAlias/_mapping")
val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map<String, Map<String, Any>>

// how to check if the results are correctly written to the write index of the alias?
val sourcePickupDate = (((sourceIndexParserMap[sourceIndex]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]
val targetPickupDate = (((targetIndexParserMap[indexAlias]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]

assertEquals(sourcePickupDate, targetPickupDate)

val pickupDateTimeTerm = "pickupDateTerm14"

val request = """
{
"size": 0,
"aggs": {
"$pickupDateTimeTerm": {
"terms": {
"field": "$pickupDateTime", "order": { "_key": "asc" }
},
"aggs": {
"avgFareAmount": { "avg": { "field": "$fareAmount" } } }
}
}
}
"""
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIndex/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)

var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$indexAlias/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(transformRes.restStatus() == RestStatus.OK)

val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!
val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!
assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size)

// Verify the values of keys and metrics in all buckets
for (i in rawAggBuckets.indices) {
assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"])
assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"])
}
}
private fun getStrictMappings(): String {
return """
"dynamic": "strict",
Expand Down
Loading