Skip to content

Commit

Permalink
Merge pull request odpi#6620 from MihaiIliescu/Data_Engine_Proxy_sync…
Browse files Browse the repository at this point in the history
…_state_stored_in_Egeria_repository

Added ProcessingState classification and endpoints to upsert it
  • Loading branch information
bogdan-sava authored Sep 29, 2022
2 parents b622f9c + a5678ba commit 0b482c5
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ public enum DataEngineErrorCode implements ExceptionMessageSet {
TOPIC_NOT_FOUND(400, "OMAS-DATA-ENGINE-400-011",
"Topic with qualifiedName {0} was not found",
"The system is unable to create a new event type attached to a topic",
"Correct the code in the caller to provide the correct topic qualified name.");
"Correct the code in the caller to provide the correct topic qualified name."),
SOFTWARE_SERVER_CAPABILITY_NOT_FOUND(400, "OMAS-DATA-ENGINE-400-012",
"Software Server Capability with qualifiedName {0} was not found",
"The system is unable to find the searched Software Server Capability",
"Correct the code in the caller to provide the correct Software Server Capability qualified name.");

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* SPDX-License-Identifier: Apache-2.0 */
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.accessservices.dataengine.model;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.Map;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

/**
* The ProcessingState classification
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@Getter
@Setter
public class ProcessingState extends Referenceable {

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private static final long serialVersionUID = 1L;

/**
* The Sync Dates map of critical elements and sync status
* -- GETTER --
* Gets sync states map.
*
* @return sync states map
* -- SETTER --
* Sets sync states map.
* @param syncDatesByKey the sync states map
*/
@JsonProperty("syncDatesByKey")
private Map<String, Long> syncDatesByKey;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* SPDX-License-Identifier: Apache-2.0 */
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.accessservices.dataengine.rest;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString
public class ProcessingStateRequestBody extends DataEngineOMASAPIRequestBody
{
private static final long serialVersionUID = 1L;

/**
* The sync state to be created
* -- GETTER --
* Return the sync state bean
*
* @return the sync state
* -- SETTER --
* Set up the sync state bean
* @param processingState the sync state
*/
@JsonProperty("processingState")
private ProcessingState processingState;
}






Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,37 @@
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.accessservices.dataengine.server.handlers;

import org.odpi.openmetadata.accessservices.dataengine.ffdc.DataEngineErrorCode;
import org.odpi.openmetadata.accessservices.dataengine.model.DeleteSemantic;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.accessservices.dataengine.server.builders.ExternalDataEnginePropertiesBuilder;
import org.odpi.openmetadata.accessservices.dataengine.server.mappers.CommonMapper;
import org.odpi.openmetadata.accessservices.dataengine.server.service.ClockService;
import org.odpi.openmetadata.commonservices.ffdc.InvalidParameterHandler;
import org.odpi.openmetadata.commonservices.generichandlers.SoftwareCapabilityHandler;
import org.odpi.openmetadata.frameworks.auditlog.messagesets.ExceptionMessageDefinition;
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.UserNotAuthorizedException;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.Classification;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.EntityDetail;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.InstanceProperties;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.MapPropertyValue;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.instances.PrimitivePropertyValue;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.properties.typedefs.TypeDef;
import org.odpi.openmetadata.repositoryservices.connectors.stores.metadatacollectionstore.repositoryconnector.OMRSRepositoryHelper;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.FunctionNotSupportedException;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.odpi.openmetadata.accessservices.dataengine.server.mappers.CommonMapper.GUID_PROPERTY_NAME;
import static org.odpi.openmetadata.accessservices.dataengine.server.mappers.CommonMapper.QUALIFIED_NAME_PROPERTY_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PROCESSING_STATE_CLASSIFICATION_TYPE_GUID;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PROCESSING_STATE_CLASSIFICATION_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.SOFTWARE_SERVER_CAPABILITY_TYPE_NAME;

/**
Expand All @@ -31,6 +41,9 @@
* SoftwareCapabilityHandler.
*/
public class DataEngineRegistrationHandler {

private static final String EXTERNAL_ENGINE_PARAMETER_NAME = "externalSourceGUID";
public static final String SYNC_DATES_BY_KEY = "syncDatesByKey";
private final String serviceName;
private final String serverName;
private final OMRSRepositoryHelper repositoryHelper;
Expand Down Expand Up @@ -149,4 +162,64 @@ ExternalDataEnginePropertiesBuilder getExternalDataEnginePropertiesBuilder(Softw
softwareServerCapability.getEngineVersion(), softwareServerCapability.getPatchLevel(), softwareServerCapability.getSource(),
softwareServerCapability.getAdditionalProperties(), repositoryHelper, serviceName, serverName);
}

public void createDataEngineClassification(String userId, ProcessingState processingState, String externalSourceName) throws
InvalidParameterException,
UserNotAuthorizedException,
PropertyServerException {
final String methodName = "createDataEngineClassification";

invalidParameterHandler.validateUserId(userId, methodName);

String externalSourceGUID = this.getExternalDataEngine(userId, externalSourceName);
if (externalSourceGUID == null) {
ExceptionMessageDefinition messageDefinition = DataEngineErrorCode.SOFTWARE_SERVER_CAPABILITY_NOT_FOUND.getMessageDefinition(externalSourceName);
throw new InvalidParameterException(messageDefinition, this.getClass().getName(), methodName, EXTERNAL_ENGINE_PARAMETER_NAME);
}

//Check if the entity has this classification and if it does then merge the syncDatesByKey

TypeDef entityTypeDef = repositoryHelper.getTypeDefByName(userId, SOFTWARE_SERVER_CAPABILITY_TYPE_NAME);
EntityDetail retrievedEntity = softwareServerCapabilityHandler.getEntityByValue(userId, externalSourceName, CommonMapper.QUALIFIED_NAME_PROPERTY_NAME,
entityTypeDef.getGUID(), entityTypeDef.getName(), Collections.singletonList(CommonMapper.QUALIFIED_NAME_PROPERTY_NAME),
false, false, null, methodName);

Map<String, Long> newSyncDatesByKey = new HashMap<>();

if (retrievedEntity.getClassifications() != null) {
for (Classification classification : retrievedEntity.getClassifications()) {
if (classification != null && classification.getName().equals(PROCESSING_STATE_CLASSIFICATION_TYPE_NAME)) {
MapPropertyValue syncDatesByKey = (MapPropertyValue) classification.getProperties().getPropertyValue(SYNC_DATES_BY_KEY);
for (Map.Entry entry : syncDatesByKey.getMapValues().getInstanceProperties().entrySet()) {
newSyncDatesByKey.put(entry.getKey().toString(),
((Long) ((PrimitivePropertyValue) entry.getValue()).getPrimitiveValue()).longValue());
}
newSyncDatesByKey.putAll(processingState.getSyncDatesByKey());
}
}
}

if (newSyncDatesByKey.isEmpty()) {
newSyncDatesByKey = processingState.getSyncDatesByKey();
}

InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties = repositoryHelper.addLongMapPropertyToInstance(null, instanceProperties, SYNC_DATES_BY_KEY,
newSyncDatesByKey, methodName);

softwareServerCapabilityHandler.setClassificationInRepository(userId,
null,
null,
externalSourceGUID,
EXTERNAL_ENGINE_PARAMETER_NAME,
SOFTWARE_SERVER_CAPABILITY_TYPE_NAME,
PROCESSING_STATE_CLASSIFICATION_TYPE_GUID,
PROCESSING_STATE_CLASSIFICATION_TYPE_NAME,
instanceProperties,
true,
false,
false,
null,
methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.odpi.openmetadata.accessservices.dataengine.model.PortImplementation;
import org.odpi.openmetadata.accessservices.dataengine.model.Process;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessHierarchy;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;
import org.odpi.openmetadata.accessservices.dataengine.model.Referenceable;
import org.odpi.openmetadata.accessservices.dataengine.model.RelationalTable;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
Expand All @@ -39,6 +40,7 @@
import org.odpi.openmetadata.accessservices.dataengine.rest.PortImplementationRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.ProcessHierarchyRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.ProcessRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.ProcessingStateRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.RelationalTableRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.SchemaTypeRequestBody;
import org.odpi.openmetadata.accessservices.dataengine.rest.TopicRequestBody;
Expand Down Expand Up @@ -97,6 +99,7 @@
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PORT_ALIAS_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PORT_IMPLEMENTATION_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PORT_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PROCESSING_STATE_CLASSIFICATION_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.PROCESS_TYPE_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.QUALIFIED_NAME_PROPERTY_NAME;
import static org.odpi.openmetadata.commonservices.generichandlers.OpenMetadataAPIMapper.QUOTE_CHARACTER_PROPERTY_NAME;
Expand Down Expand Up @@ -2172,6 +2175,55 @@ public void deleteEventType(String userId, String serverName, String externalSou
log.debug(DEBUG_DELETE_MESSAGE, eventTypeGUID, TOPIC_TYPE_NAME);
}

/**
* Create or update the ProcessingState with provided map of critical elements and sync states
*
* @param serverName name of server instance to call
* @param userId the name of the calling user
* @param processingStateRequestBody map of critical elements and sync states
* @return void response
*/
public VoidResponse upsertProcessingState(String userId, String serverName, ProcessingStateRequestBody processingStateRequestBody) {
final String methodName = "upsertProcessingState";

VoidResponse response = new VoidResponse();
try {
validateRequestBody(userId, serverName, processingStateRequestBody, methodName);

ProcessingState processingState = processingStateRequestBody.getProcessingState();
if (processingState == null) {
restExceptionHandler.handleMissingValue("processingState", methodName);
return response;
}
return upsertProcessingState(userId, serverName, processingState, processingStateRequestBody.getExternalSourceName());
} catch (Exception error) {
restExceptionHandler.captureExceptions(response, error, methodName);
}
return response;
}

/**
* Create or update the ProcessingState with provided map of critical elements and sync states
*
* @param serverName name of server instance to call
* @param userId the name of the calling user
* @param processingState map of critical elements and sync states
* @param externalSourceName the unique name of the external source
* @return void response
*/
public VoidResponse upsertProcessingState(String userId, String serverName, ProcessingState processingState, String externalSourceName) {
final String methodName = "upsertProcessingState";

VoidResponse response = new VoidResponse();
try {
DataEngineRegistrationHandler handler = instanceHandler.getRegistrationHandler(userId, serverName, methodName);
handler.createDataEngineClassification(userId, processingState, externalSourceName);
} catch (Exception error) {
restExceptionHandler.captureExceptions(response, error, methodName);
}
return response;
}

private boolean isTopicRequestBodyValid(String userId, String serverName, TopicRequestBody topicRequestBody, String methodName) throws
InvalidParameterException {
validateRequestBody(userId, serverName, topicRequestBody, methodName);
Expand Down
Loading

0 comments on commit 0b482c5

Please sign in to comment.