From df4e0aad1405bfa76e1f018d38a7cf5557de551f Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 5 May 2022 14:27:57 -0400 Subject: [PATCH] GH-2260: Reduce Header Constant Verbosity Resolves https://github.com/spring-projects/spring-kafka/issues/2260 **cherry-pick to main, removing 4 deprecated constants** --- .../src/main/asciidoc/kafka.adoc | 38 +++++++++---------- .../kafka/core/KafkaOperations.java | 4 +- .../MessagingMessageListenerAdapter.java | 6 +-- .../messaging/MessagingTransformer.java | 4 +- .../support/AbstractKafkaHeaderMapper.java | 8 ++-- .../kafka/support/KafkaHeaders.java | 21 ++++++---- .../support/converter/MessageConverter.java | 6 +-- .../converter/MessagingMessageConverter.java | 6 +-- .../BatchListenerConversionTests.java | 6 +-- .../EnableKafkaIntegrationTests.java | 28 +++++++------- .../kafka/core/KafkaTemplateTests.java | 12 +++--- ...KafkaProducerTemplateIntegrationTests.java | 8 ++-- .../ReplyingKafkaTemplateTests.java | 6 +-- .../ExistingRetryTopicIntegrationTests.java | 4 +- .../converter/BatchMessageConverterTests.java | 8 ++-- .../MessagingMessageConverterTests.java | 8 ++-- 16 files changed, 89 insertions(+), 84 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index fcecadaaac..77929c76c0 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -275,8 +275,8 @@ Then, to use the template, you can invoke one of its methods. When you use the methods with a `Message` parameter, the topic, partition, and key information is provided in a message header that includes the following items: * `KafkaHeaders.TOPIC` -* `KafkaHeaders.PARTITION_ID` -* `KafkaHeaders.MESSAGE_KEY` +* `KafkaHeaders.PARTITION` +* `KafkaHeaders.KEY` * `KafkaHeaders.TIMESTAMP` The message payload is the data. @@ -831,7 +831,7 @@ In this example, we use the reply topic header from the request: public Message messageReturn(String in) { return MessageBuilder.withPayload(in.toUpperCase()) .setHeader(KafkaHeaders.TOPIC, replyTo) - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .setHeader(KafkaHeaders.CORRELATION_ID, correlation) .build(); } @@ -850,7 +850,7 @@ It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.R @SendTo // default REPLY_TOPIC header public Message messageReturn(String in) { return MessageBuilder.withPayload(in.toUpperCase()) - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .build(); } ---- @@ -1474,13 +1474,13 @@ Finally, metadata about the record is available from message headers. You can use the following header names to retrieve the headers of the message: * `KafkaHeaders.OFFSET` -* `KafkaHeaders.RECEIVED_MESSAGE_KEY` +* `KafkaHeaders.RECEIVED_KEY` * `KafkaHeaders.RECEIVED_TOPIC` -* `KafkaHeaders.RECEIVED_PARTITION_ID` +* `KafkaHeaders.RECEIVED_PARTITION` * `KafkaHeaders.RECEIVED_TIMESTAMP` * `KafkaHeaders.TIMESTAMP_TYPE` -Starting with version 2.5 the `RECEIVED_MESSAGE_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value. +Starting with version 2.5 the `RECEIVED_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value. This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present. The following example shows how to use the headers: @@ -1490,8 +1490,8 @@ The following example shows how to use the headers: ---- @KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, - @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, + @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { @@ -1558,8 +1558,8 @@ The following example shows how to use the headers: ---- @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List list, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions, + @Header(KafkaHeaders.RECEIVED_KEY) List keys, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, @Header(KafkaHeaders.OFFSET) List offsets) { ... @@ -2160,7 +2160,7 @@ public class MultiListenerSendTo { @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + @Header(KafkaHeaders.RECEIVED_KEY) int key) { ... } @@ -2258,7 +2258,7 @@ public Message listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] rep @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) { return MessageBuilder.withPayload(in.toUpperCase()) .setHeader(KafkaHeaders.TOPIC, replyTo) - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .setHeader(KafkaHeaders.CORRELATION_ID, correlation) .setHeader("someOtherHeader", "someValue") .build(); @@ -4886,7 +4886,7 @@ The following example shows such a configuration: [source, java] ---- @KafkaListener(id = "deletableListener", topics = "myTopic") -public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { +public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) { // value == null represents key deletion } ---- @@ -4913,7 +4913,7 @@ static class MultiListenerBean { } @KafkaHandler - public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) { ... } @@ -4999,7 +4999,7 @@ public ConsumerAwareListenerErrorHandler listen3ErrorHandler() { MessageHeaders headers = m.getHeaders(); c.seek(new org.apache.kafka.common.TopicPartition( headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), - headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), + headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)), headers.get(KafkaHeaders.OFFSET, Long.class)); return null; }; @@ -5018,7 +5018,7 @@ public ConsumerAwareListenerErrorHandler listen10ErrorHandler() { this.listen10Exception = e; MessageHeaders headers = m.getHeaders(); List topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); - List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); + List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class); List offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map offsetsToReset = new HashMap<>(); for (int i = 0; i < topics.size(); i++) { @@ -5575,8 +5575,8 @@ For POJO batch listeners, starting with version 2.8.6, the header is copied into @KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory", info = "info for batch") public void listen(List list, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions, + @Header(KafkaHeaders.RECEIVED_KEY) List keys, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, @Header(KafkaHeaders.OFFSET) List offsets, @Header(KafkaHeaders.LISTENER_INFO) String info) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index ce9d265bf3..88b75abaa1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -151,8 +151,8 @@ public interface KafkaOperations { * @param message the message to send. * @return a Future for the {@link SendResult}. * @see org.springframework.kafka.support.KafkaHeaders#TOPIC - * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID - * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY + * @see org.springframework.kafka.support.KafkaHeaders#PARTITION + * @see org.springframework.kafka.support.KafkaHeaders#KEY */ ListenableFuture> send(Message message); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index e62c1f4d34..458728a747 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -479,7 +479,7 @@ private Message checkHeaders(Object result, String topic, Object source) { // boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null; boolean sourceIsMessage = source instanceof Message; boolean needsCorrelation = headers.get(KafkaHeaders.CORRELATION_ID) == null && sourceIsMessage; - boolean needsPartition = headers.get(KafkaHeaders.PARTITION_ID) == null && sourceIsMessage + boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage && getReplyPartition((Message) source) != null; if (needsTopic || needsCorrelation || needsPartition) { MessageBuilder builder = MessageBuilder.fromMessage(reply); @@ -546,7 +546,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc private void setPartition(MessageBuilder builder, Message source) { byte[] partitionBytes = getReplyPartition(source); if (partitionBytes != null) { - builder.setHeader(KafkaHeaders.PARTITION_ID, ByteBuffer.wrap(partitionBytes).getInt()); + builder.setHeader(KafkaHeaders.PARTITION, ByteBuffer.wrap(partitionBytes).getInt()); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java index 48c497a34b..bef1c2d104 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,7 +92,7 @@ public KeyValue transform(K key, V value) { headers.add(header); } }); - Object key2 = message.getHeaders().get(KafkaHeaders.MESSAGE_KEY); + Object key2 = message.getHeaders().get(KafkaHeaders.KEY); return new KeyValue(key2 == null ? key : key2, message.getPayload()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 4d7dfb06bc..4971cc1c67 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -68,12 +68,12 @@ public AbstractKafkaHeaderMapper(String... patterns) { this.matchers.add(new NeverMatchHeaderMatcher( KafkaHeaders.ACKNOWLEDGMENT, KafkaHeaders.CONSUMER, - KafkaHeaders.MESSAGE_KEY, + KafkaHeaders.KEY, KafkaHeaders.OFFSET, - KafkaHeaders.PARTITION_ID, + KafkaHeaders.PARTITION, KafkaHeaders.RAW_DATA, - KafkaHeaders.RECEIVED_MESSAGE_KEY, - KafkaHeaders.RECEIVED_PARTITION_ID, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_PARTITION, KafkaHeaders.RECEIVED_TIMESTAMP, KafkaHeaders.RECEIVED_TOPIC, KafkaHeaders.TIMESTAMP, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java index 5472161d50..3e5ee86781 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java @@ -42,14 +42,16 @@ public abstract class KafkaHeaders { public static final String TOPIC = PREFIX + "topic"; /** - * The header containing the message key when sending data to Kafka. + * The header containing the record key when sending data to Kafka. + * @since 2.9 */ - public static final String MESSAGE_KEY = PREFIX + "messageKey"; + public static final String KEY = PREFIX + "messageKey"; /** * The header containing the topic partition when sending data to Kafka. + * @since 2.0 */ - public static final String PARTITION_ID = PREFIX + "partitionId"; + public static final String PARTITION = PREFIX + "partitionId"; /** * The header for the partition offset. @@ -83,17 +85,20 @@ public abstract class KafkaHeaders { public static final String RECEIVED_TOPIC = RECEIVED + "Topic"; /** - * The header containing the message key for the received message. + * The header containing the record key from the received message. + * @since 2.9 */ - public static final String RECEIVED_MESSAGE_KEY = RECEIVED + "MessageKey"; + public static final String RECEIVED_KEY = RECEIVED + "MessageKey"; /** - * The header containing the topic partition for the received message. + * The header containing the topic partition from the received message. + * @since 2.9 */ - public static final String RECEIVED_PARTITION_ID = RECEIVED + "PartitionId"; + public static final String RECEIVED_PARTITION = RECEIVED + "PartitionId"; /** - * The header for holding the {@link org.apache.kafka.common.record.TimestampType type} of timestamp. + * The header for holding the {@link org.apache.kafka.common.record.TimestampType + * type} of timestamp. */ public static final String TIMESTAMP_TYPE = PREFIX + "timestampType"; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index ba978b2986..0d31e685a6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,12 +61,12 @@ default void commonHeaders(Acknowledgment acknowledgment, Consumer consume @Nullable Object timestampType, Object timestamp) { rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION, partition); rawHeaders.put(KafkaHeaders.OFFSET, offset); rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType); rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp); JavaUtils.INSTANCE - .acceptIfNotNull(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey, (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.RECEIVED_KEY, theKey, (key, val) -> rawHeaders.put(key, val)) .acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(), (key, val) -> rawHeaders.put(key, val)) .acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val)) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index c7a4643a38..5a8d9ae916 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -227,8 +227,8 @@ else if (topicHeader == null) { throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not " + topicHeader.getClass()); } - Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class); - Object key = headers.get(KafkaHeaders.MESSAGE_KEY); + Integer partition = headers.get(KafkaHeaders.PARTITION, Integer.class); + Object key = headers.get(KafkaHeaders.KEY); Object payload = convertPayload(message); Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class); Headers recordHeaders = initialRecordHeaders(message); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 2a0a8a724a..79eb35d9fb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -266,7 +266,7 @@ public KafkaListenerContainerFactory getContainerFactory() { containerFactory = "#{__listener.containerFactory}") // @SendTo("foo") test WARN log for void return public void listen1(List foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions) { + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions) { if (this.received == null) { this.received = foos; } @@ -318,7 +318,7 @@ public Collection> listen1(List foos) { } return foos.stream().map(f -> MessageBuilder.withPayload(new Foo(f.getBar().toUpperCase())) .setHeader(KafkaHeaders.TOPIC, "blc5") - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .build()) .collect(Collectors.toList()); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index a33ea2c6ca..9007acbd0a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -492,8 +492,8 @@ public void testJson() throws Exception { Foo foo = new Foo("bar"); kafkaJsonTemplate.send(MessageBuilder.withPayload(foo) .setHeader(KafkaHeaders.TOPIC, "annotated10") - .setHeader(KafkaHeaders.PARTITION_ID, 0) - .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .setHeader(KafkaHeaders.PARTITION, 0) + .setHeader(KafkaHeaders.KEY, 2) .build()); assertThat(this.listener.latch6.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.foo.getBar()).isEqualTo("bar"); @@ -523,8 +523,8 @@ public void testJsonHeaders() throws Exception { Foo foo = new Foo("bar"); this.kafkaJsonTemplate.send(MessageBuilder.withPayload(foo) .setHeader(KafkaHeaders.TOPIC, "annotated31") - .setHeader(KafkaHeaders.PARTITION_ID, 0) - .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .setHeader(KafkaHeaders.PARTITION, 0) + .setHeader(KafkaHeaders.KEY, 2) .build()); assertThat(this.listener.latch19.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.foo.getBar()).isEqualTo("bar"); @@ -1519,7 +1519,7 @@ public ConsumerAwareListenerErrorHandler listen3ErrorHandler() { MessageHeaders headers = m.getHeaders(); c.seek(new org.apache.kafka.common.TopicPartition( headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), - headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), + headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)), headers.get(KafkaHeaders.OFFSET, Long.class)); return null; }; @@ -1557,7 +1557,7 @@ protected void resetAllOffsets(Message message, Consumer consumer) { return; } @SuppressWarnings("unchecked") - List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); + List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class); @SuppressWarnings("unchecked") List offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map offsetsToReset = new HashMap<>(); @@ -1868,8 +1868,8 @@ public void listen1Batch(List foo) { info = "#{@barInfo}") public void listen2(@Payload String foo, @Header(KafkaHeaders.GROUP_ID) String groupId, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, + @Header(KafkaHeaders.RECEIVED_KEY) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) { this.key = key; @@ -1984,8 +1984,8 @@ public void listen10(List list, @Header(KafkaHeaders.GROUP_ID) String gr @KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory", info = "info for batch") public void listen11(List list, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List keys, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions, + @Header(KafkaHeaders.RECEIVED_KEY) List keys, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, @Header(KafkaHeaders.OFFSET) List offsets, @Header(KafkaHeaders.LISTENER_INFO) String info) { @@ -2089,7 +2089,7 @@ public Collection replyingBatchListenerCollection(List in) { @KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" }, containerFactory = "batchFactory") public void batchAckListener(@SuppressWarnings("unused") List in, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitionsHeader, + @Header(KafkaHeaders.RECEIVED_PARTITION) List partitionsHeader, @Header(KafkaHeaders.RECEIVED_TOPIC) List topicsHeader, Consumer consumer) { @@ -2126,7 +2126,7 @@ public void ackWithAutoContainerListener(String payload, Acknowledgment ack) { @KafkaListener(id = "bytesKey", topics = "annotated36", clientIdPrefix = "tag", containerFactory = "bytesStringListenerContainerFactory") - public void bytesKey(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { + public void bytesKey(String in, @Header(KafkaHeaders.RECEIVED_KEY) String key) { this.convertedKey = key; this.keyLatch.countDown(); } @@ -2321,7 +2321,7 @@ public void bar(@NonNull String bar) { @KafkaHandler @SendTo("#{'${foo:annotated8reply}'}") - public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) { this.latch2.countDown(); return "OK"; } @@ -2403,7 +2403,7 @@ public String foo(String in) { @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + @Header(KafkaHeaders.RECEIVED_KEY) int key) { return "BAR"; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index e323b9436d..4c5bbb0370 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -190,15 +190,15 @@ void testTemplate() { template.send(MessageBuilder.withPayload("fiz") .setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC) - .setHeader(KafkaHeaders.PARTITION_ID, 0) - .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .setHeader(KafkaHeaders.PARTITION, 0) + .setHeader(KafkaHeaders.KEY, 2) .build()); received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(allOf(keyValue(2, "fiz"), partition(0))); template.send(MessageBuilder.withPayload("buz") - .setHeader(KafkaHeaders.PARTITION_ID, 1) - .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .setHeader(KafkaHeaders.PARTITION, 1) + .setHeader(KafkaHeaders.KEY, 2) .build()); received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(allOf(keyValue(2, "buz"), partition(1))); @@ -271,7 +271,7 @@ void testWithMessage() { Message message1 = MessageBuilder.withPayload("foo-message") .setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC) - .setHeader(KafkaHeaders.PARTITION_ID, 0) + .setHeader(KafkaHeaders.PARTITION, 0) .setHeader("foo", "bar") .setHeader(KafkaHeaders.RECEIVED_TOPIC, "dummy") .build(); @@ -292,7 +292,7 @@ void testWithMessage() { Message message2 = MessageBuilder.withPayload("foo-message-2") .setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC) - .setHeader(KafkaHeaders.PARTITION_ID, 0) + .setHeader(KafkaHeaders.PARTITION, 0) .setHeader(KafkaHeaders.TIMESTAMP, 1487694048615L) .setHeader("foo", "bar") .build(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java index d85174058b..fd63140bce 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -239,7 +239,7 @@ public void shouldSendSingleRecordAsPartitionTimestampKeyValueAndReceiveIt() { @Test public void shouldSendSingleRecordAsProducerRecordAndReceiveIt() { List
producerRecordHeaders = convertToKafkaHeaders( - new SimpleImmutableEntry<>(KafkaHeaders.PARTITION_ID, 0), + new SimpleImmutableEntry<>(KafkaHeaders.PARTITION, 0), new SimpleImmutableEntry<>("foo", "bar"), new SimpleImmutableEntry<>(KafkaHeaders.RECEIVED_TOPIC, "dummy")); @@ -273,7 +273,7 @@ public void shouldSendSingleRecordAsProducerRecordAndReceiveIt() { @Test public void shouldSendSingleRecordAsSenderRecordAndReceiveIt() { List
producerRecordHeaders = convertToKafkaHeaders( - new SimpleImmutableEntry<>(KafkaHeaders.PARTITION_ID, 0), + new SimpleImmutableEntry<>(KafkaHeaders.PARTITION, 0), new SimpleImmutableEntry<>("foo", "bar"), new SimpleImmutableEntry<>(KafkaHeaders.RECEIVED_TOPIC, "dummy")); @@ -310,7 +310,7 @@ public void shouldSendSingleRecordAsSenderRecordAndReceiveIt() { @Test public void shouldSendSingleRecordAsMessageAndReceiveIt() { Message message = MessageBuilder.withPayload(DEFAULT_VALUE) - .setHeader(KafkaHeaders.PARTITION_ID, 0) + .setHeader(KafkaHeaders.PARTITION, 0) .setHeader("foo", "bar") .setHeader(KafkaHeaders.RECEIVED_TOPIC, "dummy") .build(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index a24dba11d2..f8d166bbf2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -859,7 +859,7 @@ public void gListener(Message in) { @SendTo // default REPLY_TOPIC header public Message messageReturn(String in) { return MessageBuilder.withPayload(in.toUpperCase()) - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .build(); } @@ -895,7 +895,7 @@ public Message listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re return MessageBuilder.withPayload(in.toUpperCase()) .setHeader(KafkaHeaders.TOPIC, replyTo) - .setHeader(KafkaHeaders.MESSAGE_KEY, 42) + .setHeader(KafkaHeaders.KEY, 42) .setHeader(KafkaHeaders.CORRELATION_ID, correlation) .build(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java index 611a78a987..22cc73870e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java @@ -162,7 +162,7 @@ static class MainTopicListener { @KafkaListener(id = "firstTopicId", topics = MAIN_TOPIC_WITH_NO_PARTITION_INFO, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) public void listenFirst(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, @Header(KafkaHeaders.ORIGINAL_PARTITION) String originalPartition, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String receivedPartition) { + @Header(KafkaHeaders.RECEIVED_PARTITION) String receivedPartition) { logger.debug("Message {} received in topic {}. originalPartition: {}, receivedPartition: {}", message, receivedTopic, originalPartition, receivedPartition); @@ -182,7 +182,7 @@ public void listenFirst(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) Str @KafkaListener(id = "secondTopicId", topics = MAIN_TOPIC_WITH_PARTITION_INFO, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) public void listenSecond(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, @Header(KafkaHeaders.ORIGINAL_PARTITION) String originalPartition, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String receivedPartition) { + @Header(KafkaHeaders.RECEIVED_PARTITION) String receivedPartition) { logger.debug("Message {} received in topic {}. originalPartition: {}, receivedPartition: {}", message, receivedTopic, originalPartition, receivedPartition); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 5af7a8b758..1e7921194a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -115,9 +115,9 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { MessageHeaders headers = message.getHeaders(); assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)) .isEqualTo(Arrays.asList("topic1", "topic1", "topic1")); - assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)) + assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)) .isEqualTo(Arrays.asList("key1", "key2", "key3")); - assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)) + assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)) .isEqualTo(Arrays.asList(0, 0, 0)); assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(Arrays.asList(1L, 2L, 3L)); assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)) @@ -154,7 +154,7 @@ public void missingHeaders() { Message message = converter.toMessage(records, null, null, null); assertThat(((List) message.getPayload())).contains("baz"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, List.class)).contains("foo"); - assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class)).contains("bar"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_KEY, List.class)).contains("bar"); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java index 752609fdce..330b69a9dc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ void missingHeaders() { Message message = converter.toMessage(record, null, null, null); assertThat(message.getPayload()).isEqualTo("baz"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); - assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_KEY)).isEqualTo("bar"); assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isNull(); } @@ -67,7 +67,7 @@ void dontMapNullKey() { Message message = converter.toMessage(record, null, null, null); assertThat(message.getPayload()).isEqualTo("baz"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); - assertThat(message.getHeaders().containsKey(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isFalse(); + assertThat(message.getHeaders().containsKey(KafkaHeaders.RECEIVED_KEY)).isFalse(); } @Test @@ -79,7 +79,7 @@ void raw() { Message message = converter.toMessage(record, null, null, null); assertThat(message.getPayload()).isEqualTo("baz"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); - assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_KEY)).isEqualTo("bar"); assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isSameAs(record); }