diff --git a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/eventmapper/ApacheAtlasOMRSRepositoryEventMapper.java b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/eventmapper/ApacheAtlasOMRSRepositoryEventMapper.java index 2e0e62e..f521596 100644 --- a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/eventmapper/ApacheAtlasOMRSRepositoryEventMapper.java +++ b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/eventmapper/ApacheAtlasOMRSRepositoryEventMapper.java @@ -330,34 +330,58 @@ private void processDeletedEntity(AtlasEntityHeader atlasEntityHeader) { if (typeDef.getCategory() == TypeDefCategory.ENTITY_DEF) { AtlasGuid entityGUID = new AtlasGuid(atlasEntityHeader.getGuid(), prefix); try { - EntityDetail entityDetail = repositoryHelper.getSkeletonEntity( - sourceName, - metadataCollectionId, - InstanceProvenanceType.LOCAL_COHORT, - null, - typeDef.getName() - ); - entityDetail.setGUID(entityGUID.toString()); - repositoryEventProcessor.processDeletedEntityEvent( - sourceName, - metadataCollectionId, - originatorServerName, - originatorServerType, - localOrganizationName, - entityDetail - ); - // Also send events for any self-referencing (generated) relationships - List generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix); - for (Relationship generatedRelationship : generatedRelationships) { - repositoryEventProcessor.processDeletedRelationshipEvent( + if (atlasRepositoryConnector.sendPurgeForDelete()) { + repositoryEventProcessor.processPurgedEntityEvent( sourceName, metadataCollectionId, originatorServerName, originatorServerType, localOrganizationName, - generatedRelationship + typeDef.getGUID(), + typeDef.getName(), + entityGUID.toString() + ); + } else { + EntityDetail entityDetail = repositoryHelper.getSkeletonEntity( + sourceName, + metadataCollectionId, + InstanceProvenanceType.LOCAL_COHORT, + null, + typeDef.getName() + ); + entityDetail.setGUID(entityGUID.toString()); + repositoryEventProcessor.processDeletedEntityEvent( + sourceName, + metadataCollectionId, + originatorServerName, + originatorServerType, + localOrganizationName, + entityDetail ); } + // Also send events for any self-referencing (generated) relationships + List generatedRelationships = getGeneratedRelationshipsForEntity(atlasEntityHeader, prefix); + for (Relationship generatedRelationship : generatedRelationships) { + if (atlasRepositoryConnector.sendPurgeForDelete()) { + repositoryEventProcessor.processPurgedRelationshipEvent( + sourceName, + metadataCollectionId, + originatorServerName, + originatorServerType, + localOrganizationName, + generatedRelationship + ); + } else { + repositoryEventProcessor.processDeletedRelationshipEvent( + sourceName, + metadataCollectionId, + originatorServerName, + originatorServerType, + localOrganizationName, + generatedRelationship + ); + } + } } catch (TypeErrorException e) { log.error("Unable to process an entity delete for: {}", entityGUID, e); } @@ -384,22 +408,35 @@ private void processDeletedRelationship(AtlasRelationshipHeader atlasRelationshi if (typeDef.getCategory() == TypeDefCategory.RELATIONSHIP_DEF) { AtlasGuid relationshipGUID = new AtlasGuid(atlasRelationshipHeader.getGuid(), prefix); try { - Relationship relationship = repositoryHelper.getSkeletonRelationship( - sourceName, - metadataCollectionId, - InstanceProvenanceType.LOCAL_COHORT, - null, - typeDef.getName() - ); - relationship.setGUID(relationshipGUID.toString()); - repositoryEventProcessor.processDeletedRelationshipEvent( - sourceName, - metadataCollectionId, - originatorServerName, - originatorServerType, - localOrganizationName, - relationship - ); + if (atlasRepositoryConnector.sendPurgeForDelete()) { + repositoryEventProcessor.processPurgedRelationshipEvent( + sourceName, + metadataCollectionId, + originatorServerName, + originatorServerType, + localOrganizationName, + typeDef.getGUID(), + typeDef.getName(), + relationshipGUID.toString() + ); + } else { + Relationship relationship = repositoryHelper.getSkeletonRelationship( + sourceName, + metadataCollectionId, + InstanceProvenanceType.LOCAL_COHORT, + null, + typeDef.getName() + ); + relationship.setGUID(relationshipGUID.toString()); + repositoryEventProcessor.processDeletedRelationshipEvent( + sourceName, + metadataCollectionId, + originatorServerName, + originatorServerType, + localOrganizationName, + relationship + ); + } } catch (TypeErrorException e) { log.error("Unable to process a relationship delete for: {}", relationshipGUID, e); } diff --git a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnector.java b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnector.java index fb3bc0a..4b73c02 100644 --- a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnector.java +++ b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnector.java @@ -37,6 +37,7 @@ public class ApacheAtlasOMRSRepositoryConnector extends OMRSRepositoryConnector private AtlasClientV2 atlasClient; private Map atlasEntityTypesByName; + private boolean purgeForDelete = false; private boolean successfulInit = false; /** @@ -101,6 +102,16 @@ public String getBaseURL() { return this.url; } + /** + * Indicates whether a purge event should be sent in the case of a delete being received (true) or + * simply to propagate a delete (false). Default is to propagate delete as a delete. + * + * @return boolean + */ + public boolean sendPurgeForDelete() { + return this.purgeForDelete; + } + /** * Indicates whether the provided TypeDef exists in this Apache Atlas environment. * @@ -320,6 +331,14 @@ private void connectToAtlas(String methodName) throws ConnectorCheckedException auditLog.logMessage(methodName, ApacheAtlasOMRSAuditCode.CONNECTED_TO_ATLAS.getMessageDefinition(getBaseURL())); + Map configProperties = connectionProperties.getConfigurationProperties(); + if (configProperties != null) { + Object candidate = configProperties.get(ApacheAtlasOMRSRepositoryConnectorProvider.PURGE_FOR_DELETE); + if (candidate instanceof Boolean) { + purgeForDelete = (Boolean) candidate; + } + } + metadataCollection = new ApacheAtlasOMRSMetadataCollection(this, serverName, repositoryHelper, diff --git a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnectorProvider.java b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnectorProvider.java index ee8fa6e..aec29e8 100644 --- a/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnectorProvider.java +++ b/apache-atlas-adapter/src/main/java/org/odpi/egeria/connectors/apache/atlas/repositoryconnector/ApacheAtlasOMRSRepositoryConnectorProvider.java @@ -5,6 +5,9 @@ import org.odpi.openmetadata.frameworks.connectors.properties.beans.ConnectorType; import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.repositoryconnector.OMRSRepositoryConnectorProviderBase; +import java.util.ArrayList; +import java.util.List; + /** * In the Open Connector Framework (OCF), a ConnectorProvider is a factory for a specific type of connector. * The ApacheAtlasOMRSRepositoryConnectorProvider is the connector provider for the ApacheAtlasOMRSRepositoryConnector. @@ -21,6 +24,8 @@ public class ApacheAtlasOMRSRepositoryConnectorProvider extends OMRSRepositoryCo static final String CONNECTOR_TYPE_NAME = "OMRS Apache Atlas Repository Connector"; static final String CONNECTOR_TYPE_DESC = "OMRS Apache Atlas Repository Connector that processes events from the Apache Atlas repository store."; + public static final String PURGE_FOR_DELETE = "purgeForDelete"; + /** * Constructor used to initialize the ConnectorProviderBase with the Java class name of the specific * OMRS Connector implementation. @@ -38,6 +43,10 @@ public ApacheAtlasOMRSRepositoryConnectorProvider() { connectorType.setDescription(CONNECTOR_TYPE_DESC); connectorType.setConnectorProviderClassName(this.getClass().getName()); + List knownConfigProperties = new ArrayList<>(); + knownConfigProperties.add(PURGE_FOR_DELETE); + connectorType.setRecognizedConfigurationProperties(knownConfigProperties); + super.connectorTypeBean = connectorType; }