Skip to content

Commit

Permalink
Update subscriptions instead of removing all (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
saschadoemer authored Jun 19, 2024
1 parent 44c283d commit c0bd945
Showing 1 changed file with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,85 @@ public SubscriptionIntegrationService(String contextBrokerUrl, List<String> noti
* each representing a different type.
*/
public void subscribe(Tenant tenant, EntityType... entityTypes) {
removeAll(tenant); // FIXME adjust and update subscriptions instead of removing all
var httpClient = HttpClient.newHttpClient();
var subscriptions = createSubscriptions(entityTypes);
for (var subscription : subscriptions) {
String json = toJson(subscription);
log.debug("Creating subscription: " + json);
List<Subscription> allExistingSubscriptions = findAll(tenant);
if (!allExistingSubscriptions.isEmpty()) {
updateExistingSubscriptions(tenant, allExistingSubscriptions, entityTypes);
} else {
var httpClient = HttpClient.newHttpClient();
var subscriptions = createSubscriptions(entityTypes);
for (var subscription : subscriptions) {
String json = toJson(subscription);
log.debug("Creating subscription: {}", json);
var httpRequest = HttpRequest.newBuilder()
.uri(URI.create(contextBrokerUrlForCommands() + "/subscriptions"))
.header("Content-Type", "application/json")
.header(CustomHeader.FIWARE_SERVICE, tenant.getTenantId())
.POST(HttpRequest.BodyPublishers.ofString(json)).build();
try {
var response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 201) {
log.error("Could not create subscription. Response: {}", response.body());
throw new BusinessException(ErrorMessage.builder()
.message("Could not create subscription, there was an error from FIWARE.")
.error(Error.FIWARE_INTEGRATION_LAYER_ERROR)
.build());
} else {
log.info("Subscription created/updated successfully.");
}
} catch (Exception e) {
log.error("Could not create subscription.", e);
throw new BusinessException(ErrorMessage.builder()
.message("Could not create subscription.")
.error(Error.FIWARE_INTEGRATION_LAYER_ERROR)
.build());
}
}
}
}

private void updateExistingSubscriptions(Tenant tenant, List<Subscription> allExistingSubscriptions, EntityType[] newEntityTypes) {
allExistingSubscriptions.forEach(subscription -> {
var newSubscription = Subscription.builder()
.id(subscription.getId())
.description(subscription.getDescription())
.subject(Subject.builder()
.entities(createSubscriptionEntities(newEntityTypes))
.build())
.notification(Notification.builder()
.http(Http.builder()
.url(subscription.getNotification().getHttp().getUrl())
.build())
.build())
.expires(subscription.getExpires())
.status(subscription.getStatus())
.build();
var httpClient = HttpClient.newHttpClient();
String json = toJson(newSubscription);
log.debug("Updating subscription: {}", json);
var httpRequest = HttpRequest.newBuilder()
.uri(URI.create(contextBrokerUrlForCommands() + "/subscriptions"))
.uri(URI.create(contextBrokerUrlForCommands() + "/subscriptions/" + subscription.getId()))
.header("Content-Type", "application/json")
.header(CustomHeader.FIWARE_SERVICE, tenant.getTenantId())
.POST(HttpRequest.BodyPublishers.ofString(json)).build();
.PUT(HttpRequest.BodyPublishers.ofString(json)).build();
try {
var response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 201) {
log.error("Could not create subscription. Response: {}", response.body());
log.debug("Request: {}", json);
log.debug("Response: {}", response.body());
log.error("Could not update subscription. Response: {}", response.body());
throw new BusinessException(ErrorMessage.builder()
.message("Could not create subscription, there was an error from FIWARE.")
.message("Could not update subscription, there was an error from FIWARE.")
.error(Error.FIWARE_INTEGRATION_LAYER_ERROR)
.build());
} else {
log.info("Subscription created/updated successfully.");
log.info("Subscription updated successfully.");
}
} catch (Exception e) {
log.error("Could not create subscription.", e);
log.error("Could not update subscription.", e);
throw new BusinessException(ErrorMessage.builder()
.message("Could not create subscription.")
.message("Could not update subscription.")
.error(Error.FIWARE_INTEGRATION_LAYER_ERROR)
.build());
}
}
});
}

private List<Subscription> createSubscriptions(EntityType... entityTypes) {
Expand Down

0 comments on commit c0bd945

Please sign in to comment.