Skip to content

Commit

Permalink
bidirectional > removed retry mechanism, improved error-handling, cle…
Browse files Browse the repository at this point in the history
…aned up code, refactored a few things.
  • Loading branch information
Florian-Limpoeck committed Oct 2, 2024
1 parent 83a5a77 commit e29e65a
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ public interface PublishPollService {
* @param subscriptionIdentifier of the clients subscription
* @param channel to which the messages are sent
*/
void pollSharedPublishesForClient(@NotNull String client, @NotNull String sharedSubscription, int qos,
boolean retainAsPublished, @Nullable Integer subscriptionIdentifier,
@NotNull Channel channel);
void pollSharedPublishesForClient(
@NotNull String client,
@NotNull String sharedSubscription,
int qos,
boolean retainAsPublished,
@Nullable Integer subscriptionIdentifier,
@NotNull Channel channel);

/**
* Remove a message form the client queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ public interface ClientQueueLocalPersistence extends LocalPersistence {
* @param bucketIndex provided by the single writer
*/
void add(
@NotNull String queueId, boolean shared, @NotNull PUBLISH publish, long max,
@NotNull QueuedMessagesStrategy strategy, boolean retained, int bucketIndex);
@NotNull String queueId,
boolean shared,
@NotNull PUBLISH publish,
long max,
@NotNull QueuedMessagesStrategy strategy,
boolean retained,
int bucketIndex);

/**
* Adds a list of PUBLISHes to a client or shared subscription queue. If the size exceeds the queue limit, the given PUBLISH
* Adds a list of PUBLISHes to a client or shared subscription queue. If the size exceeds the queue limit, the given
* PUBLISH
* or the oldest PUBLISH in the queue will be dropped dependent on the queued messages strategy.
*
* @param queueId for which the PUBLISH will be queued
Expand All @@ -67,8 +73,13 @@ void add(
* @param bucketIndex provided by the single writer
*/
void add(
@NotNull String queueId, boolean shared, @NotNull List<PUBLISH> publishes, long max,
@NotNull QueuedMessagesStrategy strategy, boolean retained, int bucketIndex);
@NotNull String queueId,
boolean shared,
@NotNull List<PUBLISH> publishes,
long max,
@NotNull QueuedMessagesStrategy strategy,
boolean retained,
int bucketIndex);

/**
* Returns a batch of PUBLISHes and marks them by setting packet identifiers. The size of the batch is limited by 2
Expand All @@ -90,7 +101,10 @@ void add(
*/
@NotNull
ImmutableList<PUBLISH> readNew(
@NotNull String queueId, boolean shared, @NotNull ImmutableIntArray packetIds, long bytesLimit,
@NotNull String queueId,
boolean shared,
@NotNull ImmutableIntArray packetIds,
long bytesLimit,
int bucketIndex);

/**
Expand Down Expand Up @@ -200,9 +214,9 @@ ImmutableList<MessageWithID> readInflight(
/**
* Remove the in-flight marker of a PUBLISH with a given unique ID.
*
* @param sharedSubscription for which the marker is removed
* @param uniqueId of the affected message
* @param bucketIndex provided by the single writer
* @param queueId for which the marker is removed
* @param uniqueId of the affected message
* @param bucketIndex provided by the single writer
*/
void removeInFlightMarker(@NotNull String sharedSubscription, @NotNull String uniqueId, int bucketIndex);
void removeInFlightMarker(@NotNull String queueId, @NotNull String uniqueId, int bucketIndex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public interface ClientQueuePersistence {
* @param queueLimit of the client session or the default configuration.
*/
@NotNull
ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull PUBLISH publish, boolean retained,
long queueLimit);
ListenableFuture<Void> add(
@NotNull String queueId,
boolean shared,
@NotNull PUBLISH publish,
boolean retained,
long queueLimit);

/**
* Add a list of publishes to the queue.
Expand All @@ -56,8 +60,12 @@ ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull PUB
* @param queueLimit of the client session or the default configuration.
*/
@NotNull
ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull List<PUBLISH> publishes, boolean retained,
final long queueLimit);
ListenableFuture<Void> add(
@NotNull String queueId,
boolean shared,
@NotNull List<PUBLISH> publishes,
boolean retained,
final long queueLimit);

/**
* Read publishes that are not yet in-flight.
Expand All @@ -71,7 +79,11 @@ ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull Lis
* @return The read publishes
*/
@NotNull
ListenableFuture<ImmutableList<PUBLISH>> readNew(@NotNull String queueId, boolean shared, @NotNull ImmutableIntArray packetIds, long byteLimit);
ListenableFuture<ImmutableList<PUBLISH>> readNew(
@NotNull String queueId,
boolean shared,
@NotNull ImmutableIntArray packetIds,
long byteLimit);

/**
* Read publishes and pubrels that are in-flight.
Expand All @@ -82,7 +94,10 @@ ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull Lis
* @return The read messages
*/
@NotNull
ListenableFuture<ImmutableList<MessageWithID>> readInflight(@NotNull String client, long byteLimit, int messageLimit);
ListenableFuture<ImmutableList<MessageWithID>> readInflight(
@NotNull String client,
long byteLimit,
int messageLimit);

/**
* Remove the entry for a given packet ID.
Expand Down Expand Up @@ -146,7 +161,10 @@ ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull Lis
* @return The read publishes
*/
@NotNull
ListenableFuture<ImmutableList<PUBLISH>> readShared(@NotNull String sharedSubscription, int messageLimit, long byteLimit);
ListenableFuture<ImmutableList<PUBLISH>> readShared(
@NotNull String sharedSubscription,
int messageLimit,
long byteLimit);

/**
* Remove a PUBLISH which has the same unique ID as the one that is provided.
Expand Down Expand Up @@ -198,6 +216,7 @@ ListenableFuture<Void> add(@NotNull String queueId, boolean shared, @NotNull Lis
void sharedPublishAvailable(@NotNull String sharedSubscription);

void addPublishAvailableCallback(@NotNull PublishAvailableCallback callback, @NotNull String queueId);

void removePublishAvailableCallback(@NotNull String queueId);

interface PublishAvailableCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ public void publishAvailable(final @NotNull String client) {
}

@Override
public void sharedPublishAvailable(final @NotNull String sharedSubscription) {
if (sharedSubscription.startsWith(MessageForwarderImpl.FORWARDER_PREFIX)) {
messageForwarder.messageAvailable(sharedSubscription);
public void sharedPublishAvailable(final @NotNull String queueId) {
if (queueId.startsWith(MessageForwarderImpl.FORWARDER_PREFIX)) {
messageForwarder.messageAvailable(queueId);
} else {
final PublishAvailableCallback availableCallback = queueidCallbackMap.get(sharedSubscription);
final PublishAvailableCallback availableCallback = queueidCallbackMap.get(queueId);
if (availableCallback != null) {
availableCallback.onPublishAvailable(sharedSubscription);
availableCallback.onPublishAvailable(queueId);
} else {
publishPollService.get().pollSharedPublishes(sharedSubscription);
publishPollService.get().pollSharedPublishes(queueId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,14 +690,14 @@ public void removeShared(
@Override
@ExecuteInSingleWriter
public void removeInFlightMarker(
final @NotNull String sharedSubscription, final @NotNull String uniqueId, final int bucketIndex) {
final @NotNull String queueId, final @NotNull String uniqueId, final int bucketIndex) {

checkNotNull(sharedSubscription, "Shared subscription must not be null");
checkNotNull(queueId, "QueueId must not be null");
checkNotNull(uniqueId, "Unique id must not be null");
ThreadPreConditions.startsWith(SINGLE_WRITER_THREAD_PREFIX);

final Map<String, Messages> bucket = sharedBuckets[bucketIndex];
final Messages messages = bucket.get(sharedSubscription);
final Messages messages = bucket.get(queueId);
if (messages == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ CompletableFuture<Void> start(final @NotNull ProtocolAdapterWrapper protocolAdap
schedulePolling(protocolAdapterWrapper);
return startWriting(protocolAdapterWrapper);
}, executorService).<Void>thenApplyAsync(unused -> {
protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
eventService.createAdapterEvent(protocolAdapterWrapper.getId(),
protocolAdapterWrapper.getProtocolAdapterInformation().getProtocolId())
.withSeverity(Event.SEVERITY.INFO)
Expand All @@ -294,6 +293,7 @@ CompletableFuture<Void> start(final @NotNull ProtocolAdapterWrapper protocolAdap
adapterCreatedEvent.addUserData("adapterType",
protocolAdapterWrapper.getProtocolAdapterInformation().getProtocolId());
remoteService.fireUsageEvent(adapterCreatedEvent);
protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
return null;
}, executorService).exceptionally(throwable -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected FileProtocolAdapterInformation() {

@Override
public @NotNull String getProtocolId() {
return "file_input";
return "file";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ public int getMaxPollingErrorsBeforeRemoval() {
@Override
public void write(final @NotNull WritingInput writingInput, final @NotNull WritingOutput writingOutput) {
if (httpClient == null) {
writingOutput.fail(new ProtocolAdapterException(),
"No response was created, because the client is null.",
false);
writingOutput.fail(new ProtocolAdapterException(), "No response was created, because the client is null.");
return;
}

Expand All @@ -307,8 +305,7 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi
writingOutput.fail(new IllegalStateException("Unsupported request method: " +
mqttToHttpMapping.getHttpRequestMethod()),
"There was an unexpected value present in the request config: " +
mqttToHttpMapping.getHttpRequestMethod(),
false);
mqttToHttpMapping.getHttpRequestMethod());
return;
}

Expand All @@ -319,9 +316,10 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi
if (isSuccessStatusCode(httpResponse.statusCode())) {
writingOutput.finish();
} else {
// TODO when retry?
writingOutput.fail("Forwarding a message from topic failed with reason code " +
httpResponse.statusCode(), false);
writingOutput.fail(String.format(
"Forwarding a message to url '%s' failed with status code '%d",
mqttToHttpMapping.getUrl(),
httpResponse.statusCode()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,29 +256,29 @@ public void write(final @NotNull WritingInput writingInput, final @NotNull Writi
opcUaObject = Objects.requireNonNull(jsonToOpcUAConverter)
.convertToOpcUAValue(opcUAWritePayload.getValue(), nodeId);
} catch (final Exception e) {
writingOutput.fail(e.getMessage(), false);
writingOutput.fail(e.getMessage());
return;
}

final Variant variant = new Variant(opcUaObject);
final DataValue dataValue = new DataValue(variant, null, null);
if (opcUaClient == null) {
log.warn("Client is not connected.");
writingOutput.fail("Client is not connected.", true);
writingOutput.fail("Client is not connected.");
return;
}
final CompletableFuture<StatusCode> writeFuture = opcUaClient.writeValue(nodeId, dataValue);
writeFuture.whenComplete((statusCode, throwable) -> {
if (throwable != null) {
log.error("Exception while writing to opcua node '{}'", writeContext.getNode(), throwable);
writingOutput.fail(throwable, null, false);
writingOutput.fail(throwable, null);
} else {
log.info("Wrote '{}' to nodeId={}", variant, nodeId);
writingOutput.finish();
}
});
} catch (final IllegalArgumentException illegalArgumentException) {
writingOutput.fail(illegalArgumentException, null, false);
writingOutput.fail(illegalArgumentException, null);
}
}

Expand Down

0 comments on commit e29e65a

Please sign in to comment.