Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
fvolz committed Sep 21, 2023
1 parent 044097e commit 829276b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Response;
import de.fraunhofer.iosb.ilt.faaast.service.model.exception.RegistryException;
import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence;
import de.fraunhofer.iosb.ilt.faaast.service.registration.RegistryHandler;
import de.fraunhofer.iosb.ilt.faaast.service.request.RequestHandlerManager;
import de.fraunhofer.iosb.ilt.faaast.service.typing.TypeInfo;
import de.fraunhofer.iosb.ilt.faaast.service.util.DeepCopyHelper;
Expand All @@ -58,7 +59,7 @@ public class Service implements ServiceContext {
private MessageBus messageBus;
private Persistence persistence;
private RequestHandlerManager requestHandler;
private FaaastRegistryHandler registryHandler;
private RegistryHandler registryHandler;

/**
* Creates a new instance of {@link Service}.
Expand Down Expand Up @@ -182,7 +183,7 @@ public AssetConnectionManager getAssetConnectionManager() {
public void start() throws MessageBusException, EndpointException {
LOGGER.debug("Get command for starting FA³ST Service");
messageBus.start();
registryHandler = new FaaastRegistryHandler(messageBus, persistence, config.getCore());
registryHandler = new RegistryHandler(messageBus, persistence, config.getCore());
if (!endpoints.isEmpty()) {
LOGGER.info("Starting endpoints...");
}
Expand All @@ -206,13 +207,10 @@ public void stop() {
try {
registryHandler.deleteAllAasInRegistry();
}
catch (InterruptedException e) {
LOGGER.warn(FaaastRegistryHandler.THREAD_INTERRUPTION_ERROR);
catch (InterruptedException | RegistryException e) {
LOGGER.error(String.format("Deregistration in Registry failed: %s", e.getMessage()), e);
Thread.currentThread().interrupt();
}
catch (RegistryException e) {
LOGGER.error(String.format("Unregistration in Fa³st-Registry failed: %s", e.getMessage()), e);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.ilt.faaast.service;
package de.fraunhofer.iosb.ilt.faaast.service.registration;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand All @@ -34,38 +33,32 @@
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.event.change.ElementUpdateEventMessage;
import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence;
import de.fraunhofer.iosb.ilt.faaast.service.util.Ensure;
import de.fraunhofer.iosb.ilt.faaast.service.util.LambdaExceptionHelper;
import io.adminshell.aas.v3.model.AssetAdministrationShell;
import io.adminshell.aas.v3.model.AssetAdministrationShellEnvironment;
import io.adminshell.aas.v3.model.KeyElements;
import io.adminshell.aas.v3.model.Reference;
import io.adminshell.aas.v3.model.Submodel;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Class to handle the synchronisation of assetAdministrationShells and submodels
* with the Fa³st Registry.
* with the Registry.
*/
public class FaaastRegistryHandler {
private static final String SYNC_ERROR_FORMAT_STRING = "Synchronisation with Fa³st-Registry failed: %s";
private static final String REGISTRY_CONNECTION_ERROR = "Connection to FA³ST-Registry failed!";
private static final String REQUEST_ERROR_FORMAT_STRING = "HTTP request failed with %d";
private static final String REQUEST_INTERRUPTION_ERROR = "HTTP request failed";
public static final String THREAD_INTERRUPTION_ERROR = "Registry handler interrupted!";
private static final Logger LOGGER = LoggerFactory.getLogger(FaaastRegistryHandler.class);
public class RegistryHandler {
private static final String SYNC_EXCEPTION = "Synchronisation with Registry exception: %s";
private static final String SYNC_EVENT_ERROR = "Synchronisation of changes with Registry failed.";
private static final Logger LOGGER = LoggerFactory.getLogger(RegistryHandler.class);

private final Persistence persistence;
private final CoreConfig coreConfig;
Expand All @@ -74,71 +67,28 @@ public class FaaastRegistryHandler {
.enable(SerializationFeature.INDENT_OUTPUT)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
private final AssetAdministrationShellEnvironment aasEnv;
private final AssetAdministrationShellEnvironment environment;

FaaastRegistryHandler(MessageBus messageBus, Persistence persistence, CoreConfig coreConfig) throws MessageBusException {
public RegistryHandler(MessageBus messageBus, Persistence persistence, CoreConfig coreConfig) throws MessageBusException {
this.persistence = persistence;
this.coreConfig = coreConfig;
httpClient = HttpClient.newBuilder().build();
messageBus.subscribe(SubscriptionInfo.create(ElementCreateEventMessage.class, m -> {
try {
handleCreateEvent(m);
}
catch (InterruptedException e) {
LOGGER.warn(THREAD_INTERRUPTION_ERROR);
Thread.currentThread().interrupt();
}
catch (Exception e) {
LOGGER.error(String.format(SYNC_ERROR_FORMAT_STRING, e.getMessage()), e);
}
}));
messageBus.subscribe(SubscriptionInfo.create(ElementUpdateEventMessage.class, m -> {
try {
handleChangeEvent(m);
}
catch (InterruptedException e) {
LOGGER.warn(THREAD_INTERRUPTION_ERROR);
Thread.currentThread().interrupt();
}
catch (Exception e) {
LOGGER.error(String.format(SYNC_ERROR_FORMAT_STRING, e.getMessage()), e);
}
}));
messageBus.subscribe(SubscriptionInfo.create(ElementDeleteEventMessage.class, m -> {
try {
handleDeleteEvent(m);
}
catch (InterruptedException e) {
LOGGER.warn(THREAD_INTERRUPTION_ERROR);
Thread.currentThread().interrupt();
}
catch (Exception e) {
LOGGER.error(String.format(SYNC_ERROR_FORMAT_STRING, e.getMessage()), e);
}
}));
aasEnv = persistence.getEnvironment();

messageBus.subscribe(SubscriptionInfo.create(ElementCreateEventMessage.class, LambdaExceptionHelper.wrap(m -> handleCreateEvent(m))));
messageBus.subscribe(SubscriptionInfo.create(ElementUpdateEventMessage.class, LambdaExceptionHelper.wrap(m -> handleChangeEvent(m))));
messageBus.subscribe(SubscriptionInfo.create(ElementDeleteEventMessage.class, LambdaExceptionHelper.wrap(m -> handleDeleteEvent(m))));
environment = persistence.getEnvironment();
try {
createAllAasInRegistry();
if (Objects.isNull(environment) || environment.getAssetAdministrationShells().isEmpty())
return;
for (AssetAdministrationShell aas: environment.getAssetAdministrationShells()) {
createIdentifiableInRegistry(getAasDescriptor(aas), coreConfig.getAasRegistryBasePath());
}
LOGGER.info("Registration of FA³ST Service in Registry successful.");
}
catch (InterruptedException e) {
LOGGER.warn(THREAD_INTERRUPTION_ERROR);
catch (RegistryException | InterruptedException e) {
LOGGER.error(String.format(SYNC_EXCEPTION, e.getMessage()), e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
LOGGER.error(String.format(SYNC_ERROR_FORMAT_STRING, e.getMessage()), e);
}
}


private void createAllAasInRegistry() throws RegistryException, InterruptedException {
if (aasEnv == null || aasEnv.getAssetAdministrationShells().isEmpty())
return;

for (AssetAdministrationShell aas: aasEnv.getAssetAdministrationShells()) {
createIdentifiableInRegistry(getAasDescriptor(aas), coreConfig.getAasRegistryBasePath());
}

}


Expand All @@ -148,10 +98,10 @@ private void createAllAasInRegistry() throws RegistryException, InterruptedExcep
* @throws RegistryException
*/
public void deleteAllAasInRegistry() throws RegistryException, InterruptedException {
if (aasEnv == null || aasEnv.getAssetAdministrationShells().isEmpty())
if (Objects.isNull(environment) || environment.getAssetAdministrationShells().isEmpty())
return;

for (AssetAdministrationShell aas: aasEnv.getAssetAdministrationShells()) {
for (AssetAdministrationShell aas: environment.getAssetAdministrationShells()) {
AssetAdministrationShellDescriptor descriptor = DefaultAssetAdministrationShellDescriptor.builder().from(aas).build();
deleteIdentifiableInRegistry(descriptor.getIdentification().getIdentifier(), coreConfig.getAasRegistryBasePath());
}
Expand Down Expand Up @@ -221,113 +171,66 @@ private boolean referenceIsKeyElement(Reference reference, KeyElements keyElemen
}


private void createIdentifiableInRegistry(AbstractIdentifiableDescriptor descriptor, String basePath) throws RegistryException, InterruptedException {
String body;
URL url;
try {
body = mapper.writeValueAsString(descriptor);
url = new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(), basePath);
}
catch (JsonProcessingException | MalformedURLException e) {
throw new RegistryException(e);
}
private static boolean is2xxSuccessful(int statusCode) {
return statusCode >= 200 && statusCode <= 299;
}


private void createIdentifiableInRegistry(AbstractIdentifiableDescriptor descriptor, String basePath) throws RegistryException, InterruptedException {
try {
HttpResponse<String> response = execute(
url,
new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(), basePath),
"",
"POST",
HttpRequest.BodyPublishers.ofString(body),
HttpRequest.BodyPublishers.ofString(mapper.writeValueAsString(descriptor)),
HttpResponse.BodyHandlers.ofString(),
null);

if (response == null) {
throw new RegistryException(REQUEST_INTERRUPTION_ERROR);
}

if (!is2xxSuccessful(response)) {
throw new RegistryException(String.format(REQUEST_ERROR_FORMAT_STRING, response.statusCode()));
if (!is2xxSuccessful(response.statusCode())) {
LOGGER.warn(String.format(SYNC_EVENT_ERROR));
}
}
catch (InterruptedException e) {
throw e;
}
catch (URISyntaxException | IOException e) {
throw new RegistryException(REGISTRY_CONNECTION_ERROR);
catch (Exception e) {
LOGGER.error(String.format(SYNC_EXCEPTION, e.getMessage()), e);
}
}


private void updateIdentifiableInRegistry(String identifier, AbstractIdentifiableDescriptor descriptor, String basePath) throws RegistryException, InterruptedException {
String body;
URL url;
try {
body = mapper.writeValueAsString(descriptor);
url = new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(),
basePath + "/" + Base64.getEncoder().encodeToString(identifier.getBytes()));
}
catch (MalformedURLException | JsonProcessingException e) {
throw new RegistryException(e);
}

try {
HttpResponse<String> response = execute(
url,
new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(),
basePath + "/" + Base64.getEncoder().encodeToString(identifier.getBytes())),
"",
"PUT",
HttpRequest.BodyPublishers.ofString(body),
HttpRequest.BodyPublishers.ofString(mapper.writeValueAsString(descriptor)),
HttpResponse.BodyHandlers.ofString(),
null);

if (response == null) {
throw new RegistryException(REQUEST_INTERRUPTION_ERROR);
}

if (!is2xxSuccessful(response)) {
throw new RegistryException(String.format(REQUEST_ERROR_FORMAT_STRING, response.statusCode()));
if (!is2xxSuccessful(response.statusCode())) {
LOGGER.warn(String.format(SYNC_EVENT_ERROR));
}
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
throw new RegistryException(REGISTRY_CONNECTION_ERROR);
LOGGER.error(String.format(SYNC_EXCEPTION, e.getMessage()), e);
}
}


private void deleteIdentifiableInRegistry(String identifier, String basePath) throws RegistryException, InterruptedException {
URL url;
try {
url = new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(),
basePath + "/" + Base64.getEncoder().encodeToString(identifier.getBytes()));
}
catch (MalformedURLException e) {
throw new RegistryException(e);
}

try {
HttpResponse<String> response = execute(
url,
new URL("HTTP", coreConfig.getRegistryHost(), coreConfig.getRegistryPort(),
basePath + "/" + Base64.getEncoder().encodeToString(identifier.getBytes())),
"",
"DELETE",
HttpRequest.BodyPublishers.noBody(),
HttpResponse.BodyHandlers.ofString(),
null);

if (response == null) {
throw new RegistryException(REQUEST_INTERRUPTION_ERROR);
}

if (!is2xxSuccessful(response)) {
throw new RegistryException(String.format(REQUEST_ERROR_FORMAT_STRING, response.statusCode()));
if (!is2xxSuccessful(response.statusCode())) {
LOGGER.warn(String.format(SYNC_EVENT_ERROR));
}
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
throw new RegistryException(REGISTRY_CONNECTION_ERROR);
LOGGER.error(String.format(SYNC_EXCEPTION, e.getMessage()), e);
}
}

Expand Down Expand Up @@ -359,16 +262,6 @@ private <T> HttpResponse<T> execute(
}


private boolean is2xxSuccessful(HttpResponse<?> response) {
return response != null && is2xxSuccessful(response.statusCode());
}


private static boolean is2xxSuccessful(int statusCode) {
return statusCode >= 200 && statusCode <= 299;
}


private AbstractIdentifiableDescriptor getAasDescriptor(AssetAdministrationShell aas) {
return DefaultAssetAdministrationShellDescriptor.builder()
.from(aas)
Expand All @@ -394,7 +287,7 @@ private List<SubmodelDescriptor> getSubmodelDescriptorsFromAas(AssetAdministrati
getSubmodelFromIdentifier(submodelReference.getKeys().get(0).getValue())).build());
}
catch (Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(String.format(SYNC_EXCEPTION, e.getMessage()), e);
}
return submodelDescriptors;
}
Expand Down
Loading

0 comments on commit 829276b

Please sign in to comment.