Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
Merge pull request #304 from cmgrote/master
Browse files Browse the repository at this point in the history
Propagate Atlas delete events as either deletes or purges (configurable)
  • Loading branch information
cmgrote authored Jun 10, 2021
2 parents c807cc4 + a1f0471 commit a5c6f18
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,34 +330,58 @@ private void processDeletedEntity(AtlasEntityHeader atlasEntityHeader) {
if (typeDef.getCategory() == TypeDefCategory.ENTITY_DEF) {
AtlasGuid entityGUID = new AtlasGuid(atlasEntityHeader.getGuid(), prefix);
try {
EntityDetail entityDetail = repositoryHelper.getSkeletonEntity(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
entityDetail.setGUID(entityGUID.toString());
repositoryEventProcessor.processDeletedEntityEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
entityDetail
);
// Also send events for any self-referencing (generated) relationships
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix);
for (Relationship generatedRelationship : generatedRelationships) {
repositoryEventProcessor.processDeletedRelationshipEvent(
if (atlasRepositoryConnector.sendPurgeForDelete()) {
repositoryEventProcessor.processPurgedEntityEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
generatedRelationship
typeDef.getGUID(),
typeDef.getName(),
entityGUID.toString()
);
} else {
EntityDetail entityDetail = repositoryHelper.getSkeletonEntity(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
entityDetail.setGUID(entityGUID.toString());
repositoryEventProcessor.processDeletedEntityEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
entityDetail
);
}
// Also send events for any self-referencing (generated) relationships
List<Relationship> generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix);
for (Relationship generatedRelationship : generatedRelationships) {
if (atlasRepositoryConnector.sendPurgeForDelete()) {
repositoryEventProcessor.processPurgedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
generatedRelationship
);
} else {
repositoryEventProcessor.processDeletedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
generatedRelationship
);
}
}
} catch (TypeErrorException e) {
log.error("Unable to process an entity delete for: {}", entityGUID, e);
}
Expand All @@ -384,22 +408,35 @@ private void processDeletedRelationship(AtlasRelationshipHeader atlasRelationshi
if (typeDef.getCategory() == TypeDefCategory.RELATIONSHIP_DEF) {
AtlasGuid relationshipGUID = new AtlasGuid(atlasRelationshipHeader.getGuid(), prefix);
try {
Relationship relationship = repositoryHelper.getSkeletonRelationship(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
relationship.setGUID(relationshipGUID.toString());
repositoryEventProcessor.processDeletedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
relationship
);
if (atlasRepositoryConnector.sendPurgeForDelete()) {
repositoryEventProcessor.processPurgedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
typeDef.getGUID(),
typeDef.getName(),
relationshipGUID.toString()
);
} else {
Relationship relationship = repositoryHelper.getSkeletonRelationship(
sourceName,
metadataCollectionId,
InstanceProvenanceType.LOCAL_COHORT,
null,
typeDef.getName()
);
relationship.setGUID(relationshipGUID.toString());
repositoryEventProcessor.processDeletedRelationshipEvent(
sourceName,
metadataCollectionId,
originatorServerName,
originatorServerType,
localOrganizationName,
relationship
);
}
} catch (TypeErrorException e) {
log.error("Unable to process a relationship delete for: {}", relationshipGUID, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ApacheAtlasOMRSRepositoryConnector extends OMRSRepositoryConnector
private AtlasClientV2 atlasClient;
private Map<String, AtlasEntityDef> atlasEntityTypesByName;

private boolean purgeForDelete = false;
private boolean successfulInit = false;

/**
Expand Down Expand Up @@ -101,6 +102,16 @@ public String getBaseURL() {
return this.url;
}

/**
* Indicates whether a purge event should be sent in the case of a delete being received (true) or
* simply to propagate a delete (false). Default is to propagate delete as a delete.
*
* @return boolean
*/
public boolean sendPurgeForDelete() {
return this.purgeForDelete;
}

/**
* Indicates whether the provided TypeDef exists in this Apache Atlas environment.
*
Expand Down Expand Up @@ -320,6 +331,14 @@ private void connectToAtlas(String methodName) throws ConnectorCheckedException

auditLog.logMessage(methodName, ApacheAtlasOMRSAuditCode.CONNECTED_TO_ATLAS.getMessageDefinition(getBaseURL()));

Map<String, Object> configProperties = connectionProperties.getConfigurationProperties();
if (configProperties != null) {
Object candidate = configProperties.get(ApacheAtlasOMRSRepositoryConnectorProvider.PURGE_FOR_DELETE);
if (candidate instanceof Boolean) {
purgeForDelete = (Boolean) candidate;
}
}

metadataCollection = new ApacheAtlasOMRSMetadataCollection(this,
serverName,
repositoryHelper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import org.odpi.openmetadata.frameworks.connectors.properties.beans.ConnectorType;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.repositoryconnector.OMRSRepositoryConnectorProviderBase;

import java.util.ArrayList;
import java.util.List;

/**
* In the Open Connector Framework (OCF), a ConnectorProvider is a factory for a specific type of connector.
* The ApacheAtlasOMRSRepositoryConnectorProvider is the connector provider for the ApacheAtlasOMRSRepositoryConnector.
Expand All @@ -21,6 +24,8 @@ public class ApacheAtlasOMRSRepositoryConnectorProvider extends OMRSRepositoryCo
static final String CONNECTOR_TYPE_NAME = "OMRS Apache Atlas Repository Connector";
static final String CONNECTOR_TYPE_DESC = "OMRS Apache Atlas Repository Connector that processes events from the Apache Atlas repository store.";

public static final String PURGE_FOR_DELETE = "purgeForDelete";

/**
* Constructor used to initialize the ConnectorProviderBase with the Java class name of the specific
* OMRS Connector implementation.
Expand All @@ -38,6 +43,10 @@ public ApacheAtlasOMRSRepositoryConnectorProvider() {
connectorType.setDescription(CONNECTOR_TYPE_DESC);
connectorType.setConnectorProviderClassName(this.getClass().getName());

List<String> knownConfigProperties = new ArrayList<>();
knownConfigProperties.add(PURGE_FOR_DELETE);
connectorType.setRecognizedConfigurationProperties(knownConfigProperties);

super.connectorTypeBean = connectorType;

}
Expand Down

0 comments on commit a5c6f18

Please sign in to comment.