Skip to content

Commit

Permalink
move status check of the adapters to enable cleanup of data policies …
Browse files Browse the repository at this point in the history
…that were created by not yet started adapters
  • Loading branch information
DC2-DanielKrueger committed Oct 15, 2024
1 parent 54c1eef commit 085e5e3
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
import com.hivemq.adapter.sdk.api.writing.WritingContext;
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
import com.hivemq.bootstrap.factories.WritingServiceProvider;
import com.hivemq.configuration.service.ConfigurationService;
import com.hivemq.edge.HiveMQEdgeRemoteService;
import com.hivemq.edge.VersionProvider;
Expand Down Expand Up @@ -318,8 +317,7 @@ CompletableFuture<Void> start(final @NotNull ProtocolAdapterWrapper protocolAdap

private @NotNull CompletableFuture<Void> startWriting(final @NotNull ProtocolAdapterWrapper protocolAdapterWrapper) {
final CompletableFuture<Void> startWritingFuture;
if (writingEnabled() &&
protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) {
if (writingEnabled() && protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) {
if (log.isDebugEnabled()) {
log.debug("Start writing for protocol adapter with id '{}'", protocolAdapterWrapper.getId());
}
Expand Down Expand Up @@ -377,8 +375,13 @@ private void schedulePolling(final @NotNull ProtocolAdapterWrapper protocolAdapt

return stopWritingFuture.thenComposeAsync(ignored -> {
final ProtocolAdapterStopOutputImpl adapterStopOutput = new ProtocolAdapterStopOutputImpl();
protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput);
return adapterStopOutput.getOutputFuture();
if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) {
protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput);
return adapterStopOutput.getOutputFuture();
} else {
return CompletableFuture.completedFuture(null);
}

}, executorService).<Void>thenApply(input -> {
log.info("Protocol-adapter '{}' stopped successfully.", protocolAdapterWrapper.getId());
protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
Expand Down Expand Up @@ -465,12 +468,9 @@ public boolean deleteAdapter(final @NotNull String id) {
if (adapterWrapper != null) {
protocolAdapterMetrics.decreaseProtocolAdapterMetric(adapterWrapper.getAdapterInformation()
.getProtocolId());

try {
if (adapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) {
// FIXME: We need to adapt the whole flow to async
stop(adapterWrapper).get();
}
// stop in any case as some resources must be cleaned up even if the adapter is still being started and is not yet in started state
stop(adapterWrapper).get();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
Expand Down

0 comments on commit 085e5e3

Please sign in to comment.