From 085e5e32a416ecb02f8938793662bd9946bdd4c8 Mon Sep 17 00:00:00 2001 From: Daniel Krueger Date: Tue, 15 Oct 2024 16:29:46 +0200 Subject: [PATCH] move status check of the adapters to enable cleanup of data policies that were created by not yet started adapters --- .../protocols/ProtocolAdapterManager.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java index 94c71013a..edceca8aa 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java @@ -37,7 +37,6 @@ import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState; import com.hivemq.adapter.sdk.api.writing.WritingContext; import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter; -import com.hivemq.bootstrap.factories.WritingServiceProvider; import com.hivemq.configuration.service.ConfigurationService; import com.hivemq.edge.HiveMQEdgeRemoteService; import com.hivemq.edge.VersionProvider; @@ -318,8 +317,7 @@ CompletableFuture start(final @NotNull ProtocolAdapterWrapper protocolAdap private @NotNull CompletableFuture startWriting(final @NotNull ProtocolAdapterWrapper protocolAdapterWrapper) { final CompletableFuture startWritingFuture; - if (writingEnabled() && - protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) { + if (writingEnabled() && protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) { if (log.isDebugEnabled()) { log.debug("Start writing for protocol adapter with id '{}'", protocolAdapterWrapper.getId()); } @@ -377,8 +375,13 @@ private void schedulePolling(final @NotNull ProtocolAdapterWrapper protocolAdapt return stopWritingFuture.thenComposeAsync(ignored -> { final ProtocolAdapterStopOutputImpl adapterStopOutput = new ProtocolAdapterStopOutputImpl(); - protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput); - return adapterStopOutput.getOutputFuture(); + if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) { + protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput); + return adapterStopOutput.getOutputFuture(); + } else { + return CompletableFuture.completedFuture(null); + } + }, executorService).thenApply(input -> { log.info("Protocol-adapter '{}' stopped successfully.", protocolAdapterWrapper.getId()); protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); @@ -465,12 +468,9 @@ public boolean deleteAdapter(final @NotNull String id) { if (adapterWrapper != null) { protocolAdapterMetrics.decreaseProtocolAdapterMetric(adapterWrapper.getAdapterInformation() .getProtocolId()); - try { - if (adapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) { - // FIXME: We need to adapt the whole flow to async - stop(adapterWrapper).get(); - } + // stop in any case as some resources must be cleaned up even if the adapter is still being started and is not yet in started state + stop(adapterWrapper).get(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } catch (final ExecutionException e) {