From 1f19344c97008ebc5a2e684ffd9bc5561e7f3f4d Mon Sep 17 00:00:00 2001 From: Lukas Brand Date: Tue, 26 Sep 2023 17:15:26 +0200 Subject: [PATCH] use finally block for manual acknowledgment --- .../mqtt/SubscribeMqtt3PublishCallback.java | 58 ++++++++++--------- .../mqtt/SubscribeMqtt5PublishCallback.java | 58 ++++++++++--------- 2 files changed, 62 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt3PublishCallback.java b/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt3PublishCallback.java index 0ab090283..a06d43dea 100644 --- a/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt3PublishCallback.java +++ b/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt3PublishCallback.java @@ -50,39 +50,43 @@ public class SubscribeMqtt3PublishCallback implements Consumer { @Override public void accept(final @NotNull Mqtt3Publish mqtt3Publish) { - String message; try { - if (isJsonOutput) { - message = new JsonMqttPublish(mqtt3Publish, isBase64).toString(); - } else { - message = MqttPublishUtils.formatPayload(mqtt3Publish.getPayloadAsBytes(), isBase64); - } + String message; + try { + if (isJsonOutput) { + message = new JsonMqttPublish(mqtt3Publish, isBase64).toString(); + } else { + message = MqttPublishUtils.formatPayload(mqtt3Publish.getPayloadAsBytes(), isBase64); + } - if (showTopics) { - message = mqtt3Publish.getTopic() + ": " + message; - } + if (showTopics) { + message = mqtt3Publish.getTopic() + ": " + message; + } - Logger.debug("{} received PUBLISH ('{}')\n {}", - LoggerUtils.getClientPrefix(client.getConfig()), - new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8), - mqtt3Publish); - } catch (final Exception e) { - Logger.error("An error occurred while processing an incoming PUBLISH.", e); - mqtt3Publish.acknowledge(); - return; - } + Logger.debug("{} received PUBLISH ('{}')\n {}", + LoggerUtils.getClientPrefix(client.getConfig()), + new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8), + mqtt3Publish); + } catch (final Exception e) { + Logger.error("An error occurred while processing an incoming PUBLISH.", e); + return; + } - if (outputFile != null) { - MqttPublishUtils.printToFile(outputFile, message); - } + if (outputFile != null) { + MqttPublishUtils.printToFile(outputFile, message); + } - if (printToStdout) { - if (System.out.checkError()) { - //TODO: Handle SIGPIPE - //throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED"); + if (printToStdout) { + if (System.out.checkError()) { + //TODO: Handle SIGPIPE + //throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED"); + } + System.out.println(message); } - System.out.println(message); + + } finally { + //Necessary to ensure log ordering + mqtt3Publish.acknowledge(); } - mqtt3Publish.acknowledge(); } } diff --git a/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt5PublishCallback.java b/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt5PublishCallback.java index 4e5d5fc83..143c80795 100644 --- a/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt5PublishCallback.java +++ b/src/main/java/com/hivemq/cli/mqtt/SubscribeMqtt5PublishCallback.java @@ -50,39 +50,43 @@ public class SubscribeMqtt5PublishCallback implements Consumer { @Override public void accept(final @NotNull Mqtt5Publish mqtt5Publish) { - String message; try { - if (isJsonOutput) { - message = new JsonMqttPublish(mqtt5Publish, isBase64).toString(); - } else { - message = MqttPublishUtils.formatPayload(mqtt5Publish.getPayloadAsBytes(), isBase64); - } + String message; + try { + if (isJsonOutput) { + message = new JsonMqttPublish(mqtt5Publish, isBase64).toString(); + } else { + message = MqttPublishUtils.formatPayload(mqtt5Publish.getPayloadAsBytes(), isBase64); + } - if (showTopics) { - message = mqtt5Publish.getTopic() + ": " + message; - } + if (showTopics) { + message = mqtt5Publish.getTopic() + ": " + message; + } - Logger.debug("{} received PUBLISH ('{}')\n {}", - LoggerUtils.getClientPrefix(client.getConfig()), - new String(mqtt5Publish.getPayloadAsBytes(), StandardCharsets.UTF_8), - mqtt5Publish); - } catch (final Exception e) { - Logger.error("An error occurred while processing an incoming PUBLISH.", e); - mqtt5Publish.acknowledge(); - return; - } + Logger.debug("{} received PUBLISH ('{}')\n {}", + LoggerUtils.getClientPrefix(client.getConfig()), + new String(mqtt5Publish.getPayloadAsBytes(), StandardCharsets.UTF_8), + mqtt5Publish); + } catch (final Exception e) { + Logger.error("An error occurred while processing an incoming PUBLISH.", e); + return; + } - if (outputFile != null) { - MqttPublishUtils.printToFile(outputFile, message); - } + if (outputFile != null) { + MqttPublishUtils.printToFile(outputFile, message); + } - if (printToStdout) { - if (System.out.checkError()) { - //TODO: Handle SIGPIPE - //throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED"); + if (printToStdout) { + if (System.out.checkError()) { + //TODO: Handle SIGPIPE + //throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED"); + } + System.out.println(message); } - System.out.println(message); + + } finally { + //Necessary to ensure log ordering + mqtt5Publish.acknowledge(); } - mqtt5Publish.acknowledge(); } }