From 7208119b6a4d349b0ca2888a0cf3482447e023e3 Mon Sep 17 00:00:00 2001 From: hantsy Date: Mon, 29 Apr 2024 17:53:52 +0800 Subject: [PATCH] chore: clean codes --- .../src/main/java/com/example/MessageHandler.java | 2 +- .../src/main/java/com/example/MessageResource.java | 2 +- .../src/test/java/com/example/InMemoryProfile.java | 2 +- .../src/test/java/com/example/MessageHandlerTest.java | 2 +- amqp/src/main/java/com/example/MessageHandler.java | 2 +- amqp/src/main/java/com/example/MessageResource.java | 2 +- amqp/src/test/java/com/example/InMemoryProfile.java | 2 +- amqp/src/test/java/com/example/MessageHandlerTest.java | 2 +- kafka/src/main/java/com/example/MessageHandler.java | 2 +- kafka/src/main/java/com/example/MessageResource.java | 2 +- kafka/src/test/java/com/example/InMemoryProfile.java | 8 +++++--- kafka/src/test/java/com/example/MessageHandlerTest.java | 2 +- kafka/src/test/java/com/example/MessageResourceTest.java | 2 +- pulsar/src/main/java/com/example/MessageHandler.java | 2 +- pulsar/src/main/java/com/example/MessageResource.java | 2 +- pulsar/src/test/java/com/example/InMemoryProfile.java | 3 ++- pulsar/src/test/java/com/example/MessageHandlerTest.java | 2 +- pulsar/src/test/java/com/example/MessageResourceTest.java | 2 +- rabbitmq/src/main/java/com/example/MessageHandler.java | 2 +- rabbitmq/src/main/java/com/example/MessageResource.java | 2 +- rabbitmq/src/test/java/com/example/InMemoryProfile.java | 2 +- .../src/test/java/com/example/MessageHandlerTest.java | 2 +- 22 files changed, 27 insertions(+), 24 deletions(-) diff --git a/amqp-rabbitmq/src/main/java/com/example/MessageHandler.java b/amqp-rabbitmq/src/main/java/com/example/MessageHandler.java index 74b6adaa7..a1adc2b2f 100644 --- a/amqp-rabbitmq/src/main/java/com/example/MessageHandler.java +++ b/amqp-rabbitmq/src/main/java/com/example/MessageHandler.java @@ -23,7 +23,7 @@ public void send(String message) { } @Incoming("messages") - @Outgoing("data-stream") + @Outgoing("messages-stream") @Broadcast Message receive(String message) { log.info("received: {}", message); diff --git a/amqp-rabbitmq/src/main/java/com/example/MessageResource.java b/amqp-rabbitmq/src/main/java/com/example/MessageResource.java index c1078bd3d..2edc3cca9 100644 --- a/amqp-rabbitmq/src/main/java/com/example/MessageResource.java +++ b/amqp-rabbitmq/src/main/java/com/example/MessageResource.java @@ -17,7 +17,7 @@ public class MessageResource { MessageHandler handler; @Inject - @Channel("data-stream") + @Channel("messages-stream") // Publisher stream; // see: https://github.com/quarkusio/quarkus/issues/35219 Flow.Publisher stream; diff --git a/amqp-rabbitmq/src/test/java/com/example/InMemoryProfile.java b/amqp-rabbitmq/src/test/java/com/example/InMemoryProfile.java index 6c4f59411..9d9d78ccd 100644 --- a/amqp-rabbitmq/src/test/java/com/example/InMemoryProfile.java +++ b/amqp-rabbitmq/src/test/java/com/example/InMemoryProfile.java @@ -14,7 +14,7 @@ public Map getConfigOverrides() { "quarkus.amqp.devservices.enabled", "false", "mp.messaging.outgoing.send.connector","smallrye-in-memory", "mp.messaging.incoming.messages.connector","smallrye-in-memory", - "mp.messaging.outgoing.data-stream.connector","smallrye-in-memory" + "mp.messaging.outgoing.messages-stream.connector","smallrye-in-memory" ); } diff --git a/amqp-rabbitmq/src/test/java/com/example/MessageHandlerTest.java b/amqp-rabbitmq/src/test/java/com/example/MessageHandlerTest.java index 9b1ce54a2..1315c4254 100644 --- a/amqp-rabbitmq/src/test/java/com/example/MessageHandlerTest.java +++ b/amqp-rabbitmq/src/test/java/com/example/MessageHandlerTest.java @@ -34,7 +34,7 @@ void setUp() { void receive() { InMemorySource messages = connector.source("messages"); InMemorySink sink = connector.sink("send"); - InMemorySink dataStream = connector.sink("data-stream"); + InMemorySink dataStream = connector.sink("messages-stream"); handler.send("hello"); await().atMost(Duration.ofMillis(1000)).untilAsserted(() -> diff --git a/amqp/src/main/java/com/example/MessageHandler.java b/amqp/src/main/java/com/example/MessageHandler.java index 74b6adaa7..a1adc2b2f 100644 --- a/amqp/src/main/java/com/example/MessageHandler.java +++ b/amqp/src/main/java/com/example/MessageHandler.java @@ -23,7 +23,7 @@ public void send(String message) { } @Incoming("messages") - @Outgoing("data-stream") + @Outgoing("messages-stream") @Broadcast Message receive(String message) { log.info("received: {}", message); diff --git a/amqp/src/main/java/com/example/MessageResource.java b/amqp/src/main/java/com/example/MessageResource.java index 389705e07..42be25b5c 100644 --- a/amqp/src/main/java/com/example/MessageResource.java +++ b/amqp/src/main/java/com/example/MessageResource.java @@ -19,7 +19,7 @@ public class MessageResource { MessageHandler handler; @Inject - @Channel("data-stream") + @Channel("messages-stream") // Publisher stream; // see: https://github.com/quarkusio/quarkus/issues/35219 Flow.Publisher stream; diff --git a/amqp/src/test/java/com/example/InMemoryProfile.java b/amqp/src/test/java/com/example/InMemoryProfile.java index b6f607693..49358836e 100644 --- a/amqp/src/test/java/com/example/InMemoryProfile.java +++ b/amqp/src/test/java/com/example/InMemoryProfile.java @@ -14,7 +14,7 @@ public Map getConfigOverrides() { "quarkus.amqp.devservices.enabled", "false", "mp.messaging.outgoing.send.connector", "smallrye-in-memory", "mp.messaging.incoming.messages.connector", "smallrye-in-memory", - "mp.messaging.outgoing.data-stream.connector", "smallrye-in-memory" + "mp.messaging.outgoing.messages-stream.connector", "smallrye-in-memory" ); } diff --git a/amqp/src/test/java/com/example/MessageHandlerTest.java b/amqp/src/test/java/com/example/MessageHandlerTest.java index 7e66cc49c..964325575 100644 --- a/amqp/src/test/java/com/example/MessageHandlerTest.java +++ b/amqp/src/test/java/com/example/MessageHandlerTest.java @@ -34,7 +34,7 @@ void setUp() { void receive() { InMemorySource messages = connector.source("messages"); InMemorySink sink = connector.sink("send"); - InMemorySink dataStream = connector.sink("data-stream"); + InMemorySink dataStream = connector.sink("messages-stream"); handler.send("hello"); await().atMost(Duration.ofMillis(1000)).untilAsserted(() -> diff --git a/kafka/src/main/java/com/example/MessageHandler.java b/kafka/src/main/java/com/example/MessageHandler.java index 8bab314b2..a1adc2b2f 100644 --- a/kafka/src/main/java/com/example/MessageHandler.java +++ b/kafka/src/main/java/com/example/MessageHandler.java @@ -23,7 +23,7 @@ public void send(String message) { } @Incoming("messages") - @Outgoing("data-result") + @Outgoing("messages-stream") @Broadcast Message receive(String message) { log.info("received: {}", message); diff --git a/kafka/src/main/java/com/example/MessageResource.java b/kafka/src/main/java/com/example/MessageResource.java index 9801357f8..9757018c2 100644 --- a/kafka/src/main/java/com/example/MessageResource.java +++ b/kafka/src/main/java/com/example/MessageResource.java @@ -15,7 +15,7 @@ public class MessageResource { @Inject MessageHandler handler; @Inject - @Channel("data-stream") + @Channel("messages-stream") // Publisher stream; // see: https://github.com/quarkusio/quarkus/issues/35219 Flow.Publisher stream; diff --git a/kafka/src/test/java/com/example/InMemoryProfile.java b/kafka/src/test/java/com/example/InMemoryProfile.java index f7958821e..1aad96747 100644 --- a/kafka/src/test/java/com/example/InMemoryProfile.java +++ b/kafka/src/test/java/com/example/InMemoryProfile.java @@ -12,9 +12,11 @@ public InMemoryProfile() { public Map getConfigOverrides() { return Map.of( "quarkus.kafka.devservices.enabled", "false", - "mp.messaging.outgoing.send.connector","smallrye-in-memory", - "mp.messaging.incoming.messages.connector","smallrye-in-memory", - "mp.messaging.outgoing.data-result.connector","smallrye-in-memory" + // see: https://github.com/quarkusio/quarkus/issues/40317#issuecomment-2082911239 + "quarkus.messaging.kafka.serializer-autodetection.enabled", "false", + "mp.messaging.outgoing.send.connector", "smallrye-in-memory", + "mp.messaging.incoming.messages.connector", "smallrye-in-memory", + "mp.messaging.outgoing.messages-stream.connector", "smallrye-in-memory" ); } diff --git a/kafka/src/test/java/com/example/MessageHandlerTest.java b/kafka/src/test/java/com/example/MessageHandlerTest.java index 8023c533f..1315c4254 100644 --- a/kafka/src/test/java/com/example/MessageHandlerTest.java +++ b/kafka/src/test/java/com/example/MessageHandlerTest.java @@ -34,7 +34,7 @@ void setUp() { void receive() { InMemorySource messages = connector.source("messages"); InMemorySink sink = connector.sink("send"); - InMemorySink dataStream = connector.sink("data-result"); + InMemorySink dataStream = connector.sink("messages-stream"); handler.send("hello"); await().atMost(Duration.ofMillis(1000)).untilAsserted(() -> diff --git a/kafka/src/test/java/com/example/MessageResourceTest.java b/kafka/src/test/java/com/example/MessageResourceTest.java index 25a28991d..b2e6b59ad 100644 --- a/kafka/src/test/java/com/example/MessageResourceTest.java +++ b/kafka/src/test/java/com/example/MessageResourceTest.java @@ -75,6 +75,6 @@ void testSendAndReceiveMessages() { } assertThat(messageReplay.size()).isEqualTo(1); - assertThat(messageReplay.get(0).body()).isEqualTo("hello"); + assertThat(messageReplay.getFirst().body()).isEqualTo("hello"); } } \ No newline at end of file diff --git a/pulsar/src/main/java/com/example/MessageHandler.java b/pulsar/src/main/java/com/example/MessageHandler.java index 8bab314b2..a1adc2b2f 100644 --- a/pulsar/src/main/java/com/example/MessageHandler.java +++ b/pulsar/src/main/java/com/example/MessageHandler.java @@ -23,7 +23,7 @@ public void send(String message) { } @Incoming("messages") - @Outgoing("data-result") + @Outgoing("messages-stream") @Broadcast Message receive(String message) { log.info("received: {}", message); diff --git a/pulsar/src/main/java/com/example/MessageResource.java b/pulsar/src/main/java/com/example/MessageResource.java index 389705e07..42be25b5c 100644 --- a/pulsar/src/main/java/com/example/MessageResource.java +++ b/pulsar/src/main/java/com/example/MessageResource.java @@ -19,7 +19,7 @@ public class MessageResource { MessageHandler handler; @Inject - @Channel("data-stream") + @Channel("messages-stream") // Publisher stream; // see: https://github.com/quarkusio/quarkus/issues/35219 Flow.Publisher stream; diff --git a/pulsar/src/test/java/com/example/InMemoryProfile.java b/pulsar/src/test/java/com/example/InMemoryProfile.java index 80fd43ea3..b063fdbb2 100644 --- a/pulsar/src/test/java/com/example/InMemoryProfile.java +++ b/pulsar/src/test/java/com/example/InMemoryProfile.java @@ -12,9 +12,10 @@ public InMemoryProfile() { public Map getConfigOverrides() { return Map.of( "quarkus.pulsar.devservices.enabled", "false", + "quarkus.messaging.pulsar.schema-autodetection.enabled","false", "mp.messaging.outgoing.send.connector","smallrye-in-memory", "mp.messaging.incoming.messages.connector","smallrye-in-memory", - "mp.messaging.outgoing.data-result.connector","smallrye-in-memory" + "mp.messaging.outgoing.messages-stream.connector","smallrye-in-memory" // "pulsar.client.serviceUrl", "", // "pulsar.admin.serviceUrl", "" ); diff --git a/pulsar/src/test/java/com/example/MessageHandlerTest.java b/pulsar/src/test/java/com/example/MessageHandlerTest.java index ea10eaa64..2fcffeb7e 100644 --- a/pulsar/src/test/java/com/example/MessageHandlerTest.java +++ b/pulsar/src/test/java/com/example/MessageHandlerTest.java @@ -46,7 +46,7 @@ void setUp() { void receive() { InMemorySource messages = connector.source("messages"); InMemorySink sink = connector.sink("send"); - InMemorySink dataStream = connector.sink("data-result"); + InMemorySink dataStream = connector.sink("messages-stream"); handler.send("hello"); await().atMost(Duration.ofMillis(1000)).untilAsserted(() -> diff --git a/pulsar/src/test/java/com/example/MessageResourceTest.java b/pulsar/src/test/java/com/example/MessageResourceTest.java index e6cadb05b..33d822a47 100644 --- a/pulsar/src/test/java/com/example/MessageResourceTest.java +++ b/pulsar/src/test/java/com/example/MessageResourceTest.java @@ -75,6 +75,6 @@ void testSendAndReceiveMessages() { } assertThat(messageReplay.size()).isEqualTo(1); - assertThat(messageReplay.get(0).body()).isEqualTo("hello"); + assertThat(messageReplay.getFirst().body()).isEqualTo("hello"); } } \ No newline at end of file diff --git a/rabbitmq/src/main/java/com/example/MessageHandler.java b/rabbitmq/src/main/java/com/example/MessageHandler.java index 74b6adaa7..a1adc2b2f 100644 --- a/rabbitmq/src/main/java/com/example/MessageHandler.java +++ b/rabbitmq/src/main/java/com/example/MessageHandler.java @@ -23,7 +23,7 @@ public void send(String message) { } @Incoming("messages") - @Outgoing("data-stream") + @Outgoing("messages-stream") @Broadcast Message receive(String message) { log.info("received: {}", message); diff --git a/rabbitmq/src/main/java/com/example/MessageResource.java b/rabbitmq/src/main/java/com/example/MessageResource.java index 3841443cc..6d65b1dd8 100644 --- a/rabbitmq/src/main/java/com/example/MessageResource.java +++ b/rabbitmq/src/main/java/com/example/MessageResource.java @@ -18,7 +18,7 @@ public class MessageResource { MessageHandler handler; @Inject - @Channel("data-stream") + @Channel("messages-stream") Flow.Publisher stream; @POST diff --git a/rabbitmq/src/test/java/com/example/InMemoryProfile.java b/rabbitmq/src/test/java/com/example/InMemoryProfile.java index 0f02f254f..18bdf2372 100644 --- a/rabbitmq/src/test/java/com/example/InMemoryProfile.java +++ b/rabbitmq/src/test/java/com/example/InMemoryProfile.java @@ -14,7 +14,7 @@ public Map getConfigOverrides() { "quarkus.rabbitmq.devservices.enabled", "false", "mp.messaging.outgoing.send.connector", "smallrye-in-memory", "mp.messaging.incoming.messages.connector", "smallrye-in-memory", - "mp.messaging.outgoing.data-stream.connector", "smallrye-in-memory" + "mp.messaging.outgoing.messages-stream.connector", "smallrye-in-memory" ); } diff --git a/rabbitmq/src/test/java/com/example/MessageHandlerTest.java b/rabbitmq/src/test/java/com/example/MessageHandlerTest.java index 9b1ce54a2..1315c4254 100644 --- a/rabbitmq/src/test/java/com/example/MessageHandlerTest.java +++ b/rabbitmq/src/test/java/com/example/MessageHandlerTest.java @@ -34,7 +34,7 @@ void setUp() { void receive() { InMemorySource messages = connector.source("messages"); InMemorySink sink = connector.sink("send"); - InMemorySink dataStream = connector.sink("data-stream"); + InMemorySink dataStream = connector.sink("messages-stream"); handler.send("hello"); await().atMost(Duration.ofMillis(1000)).untilAsserted(() ->