Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
Merge pull request #302 from cmgrote/master
Browse files Browse the repository at this point in the history
Implements entity and relationship delete events
  • Loading branch information
cmgrote authored Jun 9, 2021
2 parents 19c4815 + 1724514 commit c807cc4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.typedefs.TypeDefPatch;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.repositoryeventmapper.OMRSRepositoryEventMapperBase;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.RepositoryErrorException;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.TypeErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -179,9 +180,10 @@ public void processEvent(String event) {
case ENTITY_UPDATE:
processUpdatedEntity(entityNotification.getEntity());
break;
/*case ENTITY_DELETE:
case ENTITY_DELETE:
processDeletedEntity(entityNotification.getEntity());
break;
case CLASSIFICATION_ADD:
/*case CLASSIFICATION_ADD:
break;
case CLASSIFICATION_UPDATE:
break;
Expand All @@ -191,9 +193,10 @@ public void processEvent(String event) {
processNewRelationship(entityNotification.getRelationship());
break;
/*case RELATIONSHIP_UPDATE:
break;
case RELATIONSHIP_DELETE:
break;*/
case RELATIONSHIP_DELETE:
processDeletedRelationship(entityNotification.getRelationship());
break;
default:
log.warn("Unrecognized operation type from Apache Atlas: {}", event);
break;
Expand Down Expand Up @@ -233,7 +236,7 @@ private void processNewEntity(AtlasEntityHeader atlasEntityHeader) {
entityDetail
);
// If the entity was mapped, also send events for any self-referencing (generated) relationships
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, entityDetail);
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix);
for (Relationship generatedRelationship : generatedRelationships) {
repositoryEventProcessor.processNewRelationshipEvent(
sourceName,
Expand Down Expand Up @@ -285,7 +288,7 @@ private void processUpdatedEntity(AtlasEntityHeader atlasEntityHeader) {
entityDetail
);
// If the entity was mapped, also send events for any self-referencing (generated) relationships
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, entityDetail);
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix);
for (Relationship generatedRelationship : generatedRelationships) {
// TODO: find a way to pull back the old version to send in the update event
// (for now we will just fake one based on the latest version of the relationship)
Expand All @@ -310,15 +313,112 @@ private void processUpdatedEntity(AtlasEntityHeader atlasEntityHeader) {
}
}

/**
* Processes and sends an OMRS event for the deleted Apache Atlas entity.
*
* @param atlasEntityHeader the deleted Apache Atlas entity information
*/
private void processDeletedEntity(AtlasEntityHeader atlasEntityHeader) {
Map<String, String> omrsTypesByPrefix = typeDefStore.getAllMappedOMRSTypeDefNames(atlasEntityHeader.getTypeName());
if (omrsTypesByPrefix != null) {
// Note: these mappings will be for both entities and self-referencing relationships.
for (Map.Entry<String, String> entry : omrsTypesByPrefix.entrySet()) {
String prefix = entry.getKey();
String omrsTypeName = entry.getValue();
// We need to ensure this OMRS type is actually for an entity before attempting to map it as one
TypeDef typeDef = typeDefStore.getTypeDefByName(omrsTypeName);
if (typeDef.getCategory() == TypeDefCategory.ENTITY_DEF) {
AtlasGuid entityGUID = new AtlasGuid(atlasEntityHeader.getGuid(), prefix);
try {
EntityDetail entityDetail = repositoryHelper.getSkeletonEntity(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
entityDetail.setGUID(entityGUID.toString());
repositoryEventProcessor.processDeletedEntityEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
entityDetail
);
// Also send events for any self-referencing (generated) relationships
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix);
for (Relationship generatedRelationship : generatedRelationships) {
repositoryEventProcessor.processDeletedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
generatedRelationship
);
}
} catch (TypeErrorException e) {
log.error("Unable to process an entity delete for: {}", entityGUID, e);
}
}
}
} else {
log.warn("Unable to process an entity delete for unmapped type: {}", atlasEntityHeader.getTypeName());
}
}

/**
* Processes and sends an OMRS event for the deleted Apache Atlas relationship.
*
* @param atlasRelationshipHeader the deleted Apache Atlas relationship information
*/
private void processDeletedRelationship(AtlasRelationshipHeader atlasRelationshipHeader) {
Map<String, String> omrsTypesByPrefix = typeDefStore.getAllMappedOMRSTypeDefNames(atlasRelationshipHeader.getTypeName());
if (omrsTypesByPrefix != null) {
for (Map.Entry<String, String> entry : omrsTypesByPrefix.entrySet()) {
String prefix = entry.getKey();
String omrsTypeName = entry.getValue();
// We need to ensure this OMRS type is actually for a relationship before attempting to map it as one
TypeDef typeDef = typeDefStore.getTypeDefByName(omrsTypeName);
if (typeDef.getCategory() == TypeDefCategory.RELATIONSHIP_DEF) {
AtlasGuid relationshipGUID = new AtlasGuid(atlasRelationshipHeader.getGuid(), prefix);
try {
Relationship relationship = repositoryHelper.getSkeletonRelationship(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
relationship.setGUID(relationshipGUID.toString());
repositoryEventProcessor.processDeletedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
relationship
);
} catch (TypeErrorException e) {
log.error("Unable to process a relationship delete for: {}", relationshipGUID, e);
}
}
}
} else {
log.warn("Unable to process a relationship delete for unmapped type: {}", atlasRelationshipHeader.getTypeName());
}
}

/**
* Generate any pseudo-relationships for the provided entity.
*
* @param atlasEntityHeader the Atlas entity for which to generate pseudo-relationships
* @param entityDetail the EntityDetail for which to generate pseudo-relationships
* @param prefix if the Atlas entity is a generated one
* @return {@code List<Relationship>}
*/
private List<Relationship> getGeneratedRelationshipsForEntity(AtlasEntityHeader atlasEntityHeader,
EntityDetail entityDetail) {
String prefix) {

String atlasTypeName = atlasEntityHeader.getTypeName();
List<Relationship> generatedRelationships = new ArrayList<>();
Expand All @@ -338,10 +438,10 @@ private List<Relationship> getGeneratedRelationshipsForEntity(AtlasEntityHeader
if (generatedRelationship != null) {
generatedRelationships.add(generatedRelationship);
} else {
log.warn("Unable to create generated relationship with prefix {}, for entity: {}", relationshipPrefix, entityDetail.getGUID());
log.warn("Unable to create generated relationship with prefix {}, for entity: {}", relationshipPrefix, new AtlasGuid(atlasEntityHeader.getGuid(), prefix));
}
} catch (RepositoryErrorException e) {
log.error("Unable to create generated relationship with prefix {}, for entity: {}", relationshipPrefix, entityDetail.getGUID(), e);
log.error("Unable to create generated relationship with prefix {}, for entity: {}", relationshipPrefix, new AtlasGuid(atlasEntityHeader.getGuid(), prefix), e);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
<mock-server.version>5.11.2</mock-server.version>
<sleepycat.version>18.3.12</sleepycat.version>
<rhino.version>1.7.13</rhino.version>
<nimbus.version>9.9.3</nimbus.version>
<nimbus.version>9.10</nimbus.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<httpclient.version>4.5.13</httpclient.version>
<kafka.version>2.4.1</kafka.version>
Expand Down Expand Up @@ -513,7 +513,7 @@
<plugin>
<groupId>org.owasp</groupId>
<artifactId>dependency-check-maven</artifactId>
<version>6.1.6</version>
<version>6.2.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down

0 comments on commit c807cc4

Please sign in to comment.