From e29e65a7df38f3164c9621e66402bb5495a5f841 Mon Sep 17 00:00:00 2001 From: Florian-Limpoeck Date: Tue, 1 Oct 2024 18:21:10 +0200 Subject: [PATCH] bidirectional > removed retry mechanism, improved error-handling, cleaned up code, refactored a few things. --- .../mqtt/services/PublishPollService.java | 10 ++++-- .../ClientQueueLocalPersistence.java | 34 +++++++++++++------ .../clientqueue/ClientQueuePersistence.java | 33 ++++++++++++++---- .../ClientQueuePersistenceImpl.java | 12 +++---- .../ClientQueueMemoryLocalPersistence.java | 6 ++-- .../protocols/ProtocolAdapterManager.java | 2 +- .../file/FileProtocolAdapterInformation.java | 2 +- .../adapters/http/HttpProtocolAdapter.java | 14 ++++---- .../adapters/opcua/OpcUaProtocolAdapter.java | 8 ++--- 9 files changed, 78 insertions(+), 43 deletions(-) diff --git a/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishPollService.java b/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishPollService.java index 65b357a34..b0dc21e92 100644 --- a/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishPollService.java +++ b/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishPollService.java @@ -74,9 +74,13 @@ public interface PublishPollService { * @param subscriptionIdentifier of the clients subscription * @param channel to which the messages are sent */ - void pollSharedPublishesForClient(@NotNull String client, @NotNull String sharedSubscription, int qos, - boolean retainAsPublished, @Nullable Integer subscriptionIdentifier, - @NotNull Channel channel); + void pollSharedPublishesForClient( + @NotNull String client, + @NotNull String sharedSubscription, + int qos, + boolean retainAsPublished, + @Nullable Integer subscriptionIdentifier, + @NotNull Channel channel); /** * Remove a message form the client queue. diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java index adf1e422e..f8dad022a 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueLocalPersistence.java @@ -49,11 +49,17 @@ public interface ClientQueueLocalPersistence extends LocalPersistence { * @param bucketIndex provided by the single writer */ void add( - @NotNull String queueId, boolean shared, @NotNull PUBLISH publish, long max, - @NotNull QueuedMessagesStrategy strategy, boolean retained, int bucketIndex); + @NotNull String queueId, + boolean shared, + @NotNull PUBLISH publish, + long max, + @NotNull QueuedMessagesStrategy strategy, + boolean retained, + int bucketIndex); /** - * Adds a list of PUBLISHes to a client or shared subscription queue. If the size exceeds the queue limit, the given PUBLISH + * Adds a list of PUBLISHes to a client or shared subscription queue. If the size exceeds the queue limit, the given + * PUBLISH * or the oldest PUBLISH in the queue will be dropped dependent on the queued messages strategy. * * @param queueId for which the PUBLISH will be queued @@ -67,8 +73,13 @@ void add( * @param bucketIndex provided by the single writer */ void add( - @NotNull String queueId, boolean shared, @NotNull List publishes, long max, - @NotNull QueuedMessagesStrategy strategy, boolean retained, int bucketIndex); + @NotNull String queueId, + boolean shared, + @NotNull List publishes, + long max, + @NotNull QueuedMessagesStrategy strategy, + boolean retained, + int bucketIndex); /** * Returns a batch of PUBLISHes and marks them by setting packet identifiers. The size of the batch is limited by 2 @@ -90,7 +101,10 @@ void add( */ @NotNull ImmutableList readNew( - @NotNull String queueId, boolean shared, @NotNull ImmutableIntArray packetIds, long bytesLimit, + @NotNull String queueId, + boolean shared, + @NotNull ImmutableIntArray packetIds, + long bytesLimit, int bucketIndex); /** @@ -200,9 +214,9 @@ ImmutableList readInflight( /** * Remove the in-flight marker of a PUBLISH with a given unique ID. * - * @param sharedSubscription for which the marker is removed - * @param uniqueId of the affected message - * @param bucketIndex provided by the single writer + * @param queueId for which the marker is removed + * @param uniqueId of the affected message + * @param bucketIndex provided by the single writer */ - void removeInFlightMarker(@NotNull String sharedSubscription, @NotNull String uniqueId, int bucketIndex); + void removeInFlightMarker(@NotNull String queueId, @NotNull String uniqueId, int bucketIndex); } diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java index 4b2f28af8..c108fee92 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistence.java @@ -41,8 +41,12 @@ public interface ClientQueuePersistence { * @param queueLimit of the client session or the default configuration. */ @NotNull - ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull PUBLISH publish, boolean retained, - long queueLimit); + ListenableFuture add( + @NotNull String queueId, + boolean shared, + @NotNull PUBLISH publish, + boolean retained, + long queueLimit); /** * Add a list of publishes to the queue. @@ -56,8 +60,12 @@ ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull PUB * @param queueLimit of the client session or the default configuration. */ @NotNull - ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull List publishes, boolean retained, - final long queueLimit); + ListenableFuture add( + @NotNull String queueId, + boolean shared, + @NotNull List publishes, + boolean retained, + final long queueLimit); /** * Read publishes that are not yet in-flight. @@ -71,7 +79,11 @@ ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull Lis * @return The read publishes */ @NotNull - ListenableFuture> readNew(@NotNull String queueId, boolean shared, @NotNull ImmutableIntArray packetIds, long byteLimit); + ListenableFuture> readNew( + @NotNull String queueId, + boolean shared, + @NotNull ImmutableIntArray packetIds, + long byteLimit); /** * Read publishes and pubrels that are in-flight. @@ -82,7 +94,10 @@ ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull Lis * @return The read messages */ @NotNull - ListenableFuture> readInflight(@NotNull String client, long byteLimit, int messageLimit); + ListenableFuture> readInflight( + @NotNull String client, + long byteLimit, + int messageLimit); /** * Remove the entry for a given packet ID. @@ -146,7 +161,10 @@ ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull Lis * @return The read publishes */ @NotNull - ListenableFuture> readShared(@NotNull String sharedSubscription, int messageLimit, long byteLimit); + ListenableFuture> readShared( + @NotNull String sharedSubscription, + int messageLimit, + long byteLimit); /** * Remove a PUBLISH which has the same unique ID as the one that is provided. @@ -198,6 +216,7 @@ ListenableFuture add(@NotNull String queueId, boolean shared, @NotNull Lis void sharedPublishAvailable(@NotNull String sharedSubscription); void addPublishAvailableCallback(@NotNull PublishAvailableCallback callback, @NotNull String queueId); + void removePublishAvailableCallback(@NotNull String queueId); interface PublishAvailableCallback { diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java index 2849cbbdd..8fb4278f1 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java @@ -184,15 +184,15 @@ public void publishAvailable(final @NotNull String client) { } @Override - public void sharedPublishAvailable(final @NotNull String sharedSubscription) { - if (sharedSubscription.startsWith(MessageForwarderImpl.FORWARDER_PREFIX)) { - messageForwarder.messageAvailable(sharedSubscription); + public void sharedPublishAvailable(final @NotNull String queueId) { + if (queueId.startsWith(MessageForwarderImpl.FORWARDER_PREFIX)) { + messageForwarder.messageAvailable(queueId); } else { - final PublishAvailableCallback availableCallback = queueidCallbackMap.get(sharedSubscription); + final PublishAvailableCallback availableCallback = queueidCallbackMap.get(queueId); if (availableCallback != null) { - availableCallback.onPublishAvailable(sharedSubscription); + availableCallback.onPublishAvailable(queueId); } else { - publishPollService.get().pollSharedPublishes(sharedSubscription); + publishPollService.get().pollSharedPublishes(queueId); } } } diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index 373de52b3..b62a67d49 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -690,14 +690,14 @@ public void removeShared( @Override @ExecuteInSingleWriter public void removeInFlightMarker( - final @NotNull String sharedSubscription, final @NotNull String uniqueId, final int bucketIndex) { + final @NotNull String queueId, final @NotNull String uniqueId, final int bucketIndex) { - checkNotNull(sharedSubscription, "Shared subscription must not be null"); + checkNotNull(queueId, "QueueId must not be null"); checkNotNull(uniqueId, "Unique id must not be null"); ThreadPreConditions.startsWith(SINGLE_WRITER_THREAD_PREFIX); final Map bucket = sharedBuckets[bucketIndex]; - final Messages messages = bucket.get(sharedSubscription); + final Messages messages = bucket.get(queueId); if (messages == null) { return; } diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java index aba03d6da..94c71013a 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java @@ -281,7 +281,6 @@ CompletableFuture start(final @NotNull ProtocolAdapterWrapper protocolAdap schedulePolling(protocolAdapterWrapper); return startWriting(protocolAdapterWrapper); }, executorService).thenApplyAsync(unused -> { - protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED); eventService.createAdapterEvent(protocolAdapterWrapper.getId(), protocolAdapterWrapper.getProtocolAdapterInformation().getProtocolId()) .withSeverity(Event.SEVERITY.INFO) @@ -294,6 +293,7 @@ CompletableFuture start(final @NotNull ProtocolAdapterWrapper protocolAdap adapterCreatedEvent.addUserData("adapterType", protocolAdapterWrapper.getProtocolAdapterInformation().getProtocolId()); remoteService.fireUsageEvent(adapterCreatedEvent); + protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED); return null; }, executorService).exceptionally(throwable -> { try { diff --git a/modules/hivemq-edge-module-file/src/main/java/com/hivemq/edge/adapters/file/FileProtocolAdapterInformation.java b/modules/hivemq-edge-module-file/src/main/java/com/hivemq/edge/adapters/file/FileProtocolAdapterInformation.java index b36727ee0..00bdc85c9 100644 --- a/modules/hivemq-edge-module-file/src/main/java/com/hivemq/edge/adapters/file/FileProtocolAdapterInformation.java +++ b/modules/hivemq-edge-module-file/src/main/java/com/hivemq/edge/adapters/file/FileProtocolAdapterInformation.java @@ -46,7 +46,7 @@ protected FileProtocolAdapterInformation() { @Override public @NotNull String getProtocolId() { - return "file_input"; + return "file"; } @Override diff --git a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java index bc25b68f3..918088834 100644 --- a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java +++ b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java @@ -278,9 +278,7 @@ public int getMaxPollingErrorsBeforeRemoval() { @Override public void write(final @NotNull WritingInput writingInput, final @NotNull WritingOutput writingOutput) { if (httpClient == null) { - writingOutput.fail(new ProtocolAdapterException(), - "No response was created, because the client is null.", - false); + writingOutput.fail(new ProtocolAdapterException(), "No response was created, because the client is null."); return; } @@ -307,8 +305,7 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi writingOutput.fail(new IllegalStateException("Unsupported request method: " + mqttToHttpMapping.getHttpRequestMethod()), "There was an unexpected value present in the request config: " + - mqttToHttpMapping.getHttpRequestMethod(), - false); + mqttToHttpMapping.getHttpRequestMethod()); return; } @@ -319,9 +316,10 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi if (isSuccessStatusCode(httpResponse.statusCode())) { writingOutput.finish(); } else { - // TODO when retry? - writingOutput.fail("Forwarding a message from topic failed with reason code " + - httpResponse.statusCode(), false); + writingOutput.fail(String.format( + "Forwarding a message to url '%s' failed with status code '%d", + mqttToHttpMapping.getUrl(), + httpResponse.statusCode())); } }); } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index 39b745104..417189333 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -256,7 +256,7 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi opcUaObject = Objects.requireNonNull(jsonToOpcUAConverter) .convertToOpcUAValue(opcUAWritePayload.getValue(), nodeId); } catch (final Exception e) { - writingOutput.fail(e.getMessage(), false); + writingOutput.fail(e.getMessage()); return; } @@ -264,21 +264,21 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi final DataValue dataValue = new DataValue(variant, null, null); if (opcUaClient == null) { log.warn("Client is not connected."); - writingOutput.fail("Client is not connected.", true); + writingOutput.fail("Client is not connected."); return; } final CompletableFuture writeFuture = opcUaClient.writeValue(nodeId, dataValue); writeFuture.whenComplete((statusCode, throwable) -> { if (throwable != null) { log.error("Exception while writing to opcua node '{}'", writeContext.getNode(), throwable); - writingOutput.fail(throwable, null, false); + writingOutput.fail(throwable, null); } else { log.info("Wrote '{}' to nodeId={}", variant, nodeId); writingOutput.finish(); } }); } catch (final IllegalArgumentException illegalArgumentException) { - writingOutput.fail(illegalArgumentException, null, false); + writingOutput.fail(illegalArgumentException, null); } }