Skip to content

Commit

Permalink
refactor(run-id): refactor run id updates (#11834)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 13, 2024
1 parent a10807d commit e672ade
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,11 @@ public List<String> batchIngestProposals(
opContext.getValidationContext().isAlternateValidation())
.build();

List<IngestResult> results = entityService.ingestProposal(opContext, batch, async);
entitySearchService.appendRunId(opContext, results);

Map<Pair<Urn, String>, List<IngestResult>> resultMap =
entityService.ingestProposal(opContext, batch, async).stream()
results.stream()
.collect(
Collectors.groupingBy(
result ->
Expand Down Expand Up @@ -864,8 +867,7 @@ public void rollbackIngestion(
private void tryIndexRunId(
@Nonnull OperationContext opContext, Urn entityUrn, @Nullable SystemMetadata systemMetadata) {
if (systemMetadata != null && systemMetadata.hasRunId()) {
entitySearchService.appendRunId(
opContext, entityUrn.getEntityType(), entityUrn, systemMetadata.getRunId());
entitySearchService.appendRunId(opContext, entityUrn, systemMetadata.getRunId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ public void deleteDocument(

@Override
public void appendRunId(
@Nonnull OperationContext opContext,
@Nonnull String entityName,
@Nonnull Urn urn,
@Nullable String runId) {
@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId) {
final String docId = indexBuilders.getIndexConvention().getEntityDocumentId(urn);

log.debug(
"Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId);
"Appending run id for entity name: {}, doc id: {}, run id: {}",
urn.getEntityType(),
docId,
runId);
esWriteDAO.applyScriptUpdate(
opContext,
entityName,
urn.getEntityType(),
docId,
/*
Script used to apply updates to the runId field of the index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.validation.ValidationException;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.resources.operations.Utils;
Expand Down Expand Up @@ -57,6 +59,8 @@
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -126,6 +130,11 @@ void setSystemOperationContext(OperationContext systemOperationContext) {
this.systemOperationContext = systemOperationContext;
}

@VisibleForTesting
void setEntitySearchService(EntitySearchService entitySearchService) {
this.entitySearchService = entitySearchService;
}

/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
* TODO: Get rid of this and migrate to getAspect.
Expand Down Expand Up @@ -320,15 +329,7 @@ private Task<String> ingestProposals(

List<IngestResult> results =
_entityService.ingestProposal(opContext, batch, asyncBool);

for (IngestResult result : results) {
// Update runIds, only works for existing documents, so ES document must exist
Urn resultUrn = result.getUrn();

if (resultUrn != null && (result.isProcessedMCL() || result.isUpdate())) {
tryIndexRunId(opContext, resultUrn, result.getRequest().getSystemMetadata(), entitySearchService);
}
}
entitySearchService.appendRunId(opContext, results);

// TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether?
return RESTLI_SUCCESS;
Expand Down Expand Up @@ -397,14 +398,4 @@ public Task<String> restoreIndices(
},
MetricRegistry.name(this.getClass(), "restoreIndices"));
}

private static void tryIndexRunId(
@Nonnull final OperationContext opContext,
final Urn urn,
final @Nullable SystemMetadata systemMetadata,
final EntitySearchService entitySearchService) {
if (systemMetadata != null && systemMetadata.hasRunId()) {
entitySearchService.appendRunId(opContext, urn.getEntityType(), urn, systemMetadata.getRunId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
Expand Down Expand Up @@ -66,6 +67,7 @@ public void setup() {
aspectResource.setEntityService(entityService);
opContext = TestOperationContexts.systemContextNoSearchAuthorization();
aspectResource.setSystemOperationContext(opContext);
aspectResource.setEntitySearchService(mock(EntitySearchService.class));
entityRegistry = opContext.getEntityRegistry();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.linkedin.metadata.search;

import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.browse.BrowseResult;
import com.linkedin.metadata.browse.BrowseResultV2;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opensearch.action.explain.ExplainResponse;
Expand Down Expand Up @@ -59,15 +65,10 @@ void deleteDocument(
/**
* Appends a run id to the list for a certain document
*
* @param entityName name of the entity
* @param urn the urn of the user
* @param runId the ID of the run
*/
void appendRunId(
@Nonnull OperationContext opContext,
@Nonnull String entityName,
@Nonnull Urn urn,
@Nullable String runId);
void appendRunId(@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId);

/**
* Gets a list of documents that match given search request. The results are aggregated and
Expand Down Expand Up @@ -329,4 +330,41 @@ ExplainResponse explain(
* @return convent
*/
IndexConvention getIndexConvention();

default void appendRunId(
@Nonnull final OperationContext opContext, @Nonnull List<IngestResult> results) {

// Only updates with runId
Map<Pair<Urn, String>, Set<BatchItem>> urnRunIdToBatchItem =
results.stream()
.filter(Objects::nonNull)
.filter(
result -> result.getUrn() != null && (result.isProcessedMCL() || result.isUpdate()))
.filter(
result ->
result.getRequest() != null
&& result.getRequest().getSystemMetadata() != null
&& result.getRequest().getSystemMetadata().hasRunId())
.map(
result ->
Map.entry(
Pair.of(
result.getUrn(), result.getRequest().getSystemMetadata().getRunId()),
result))
.collect(
Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(e -> e.getValue().getRequest(), Collectors.toSet())));

// Only update if not key aspect (document doesn't exist)
urnRunIdToBatchItem.entrySet().stream()
.filter(
entry ->
entry.getValue().stream()
.noneMatch(
item ->
item.getEntitySpec().getKeyAspectName().equals(item.getAspectName())))
.forEach(
entry -> appendRunId(opContext, entry.getKey().getKey(), entry.getKey().getValue()));
}
}
Loading

0 comments on commit e672ade

Please sign in to comment.