Skip to content

Commit

Permalink
logging for transform
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Apr 3, 2024
1 parent e5a13ff commit 0a29a2c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ object TransformRunner :
val transformLockManager = transformContext.transformLockManager
transformLockManager.acquireLockForScheduledJob()
try {
logger.debug("Transform Job ${transform.id} Initiating.")
do {
when {
transformLockManager.lock == null -> {
Expand All @@ -141,13 +142,19 @@ object TransformRunner :
// If we have not populated the list of shards to search, do so now
if (bucketsToTransform.shardsToSearch == null) {
// Note the timestamp when we got the shard global checkpoints to the user may know what data is included
logger.debug("Transform job ${transform.id} is populating shards to search for index ${transform.sourceIndex}.")
newGlobalCheckpointTime = Instant.now()
newGlobalCheckpoints = transformSearchService.getShardsGlobalCheckpoint(transform.sourceIndex)
bucketsToTransform =
bucketsToTransform.initializeShardsToSearch(
metadata.shardIDToGlobalCheckpoint,
newGlobalCheckpoints,
)
logger.debug(
"Transform job {} fetched global checkpoints {}.",
transform.id,
newGlobalCheckpoints
)
}
// If there are shards to search do it here
if (bucketsToTransform.currentShard != null) {
Expand All @@ -161,6 +168,13 @@ object TransformRunner :
bucketsToTransform.modifiedBuckets.filter {
transformProcessedBucketLog.isNotProcessed(it)
}.toMutableSet()

logger.debug(
"Transform job {} recompute to start with modified buckets {}. Processing shard {}.",
transform.id,
modifiedBuckets.size,
bucketsToTransform.currentShard
)
// Recompute modified buckets and update them in targetIndex
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext)
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
Expand Down Expand Up @@ -229,6 +243,7 @@ object TransformRunner :
)
}
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
logger.debug("Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", transform.id,currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId ,currentShard.from, currentShard.to)
val mergedSearchTime =
currentBucketsToTransform.metadata.stats.searchTimeInMillis +
shardLevelModifiedBuckets.searchTimeInMillis
Expand Down Expand Up @@ -325,7 +340,9 @@ object TransformRunner :
}
val indexTimeInMillis =
withTransformSecurityContext(transform) {
logger.debug("Transform job {} starting to index for target index: {} with documents {}.", transform.id,transform.targetIndex, transformSearchResult.docsToIndex.size)
transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext)
logger.debug("Transform job {} completed to index for target index: {}.", transform.id, transform.targetIndex)
}
val stats = transformSearchResult.stats
val updatedStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,17 @@ class TransformSearchService(
return@suspendUntil
}
val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds)
logger.debug("Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to)
search(request, listener)
logger.debug("Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to)
}
}
// If the request was successful, update page size
transformContext.lastSuccessfulPageSize = pageSize
logger.debug("Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to)
transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart)
logger.debug("Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis())
logger.trace("Transform job {} search response {} for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, searchResponse, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis())
return convertBucketSearchResponse(transform, searchResponse)
} catch (e: TransformSearchServiceException) {
throw e
Expand Down

0 comments on commit 0a29a2c

Please sign in to comment.