Skip to content

Commit

Permalink
GH-2260: Reduce Header Constant Verbosity
Browse files Browse the repository at this point in the history
Resolves #2260

**cherry-pick to main, removing 4 deprecated constants**
  • Loading branch information
garyrussell authored and artembilan committed May 5, 2022
1 parent a957b39 commit df4e0aa
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 84 deletions.
38 changes: 19 additions & 19 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ Then, to use the template, you can invoke one of its methods.
When you use the methods with a `Message<?>` parameter, the topic, partition, and key information is provided in a message header that includes the following items:

* `KafkaHeaders.TOPIC`
* `KafkaHeaders.PARTITION_ID`
* `KafkaHeaders.MESSAGE_KEY`
* `KafkaHeaders.PARTITION`
* `KafkaHeaders.KEY`
* `KafkaHeaders.TIMESTAMP`

The message payload is the data.
Expand Down Expand Up @@ -831,7 +831,7 @@ In this example, we use the reply topic header from the request:
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
Expand All @@ -850,7 +850,7 @@ It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.R
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
----
Expand Down Expand Up @@ -1474,13 +1474,13 @@ Finally, metadata about the record is available from message headers.
You can use the following header names to retrieve the headers of the message:

* `KafkaHeaders.OFFSET`
* `KafkaHeaders.RECEIVED_MESSAGE_KEY`
* `KafkaHeaders.RECEIVED_KEY`
* `KafkaHeaders.RECEIVED_TOPIC`
* `KafkaHeaders.RECEIVED_PARTITION_ID`
* `KafkaHeaders.RECEIVED_PARTITION`
* `KafkaHeaders.RECEIVED_TIMESTAMP`
* `KafkaHeaders.TIMESTAMP_TYPE`

Starting with version 2.5 the `RECEIVED_MESSAGE_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
Starting with version 2.5 the `RECEIVED_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present.

The following example shows how to use the headers:
Expand All @@ -1490,8 +1490,8 @@ The following example shows how to use the headers:
----
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
Expand Down Expand Up @@ -1558,8 +1558,8 @@ The following example shows how to use the headers:
----
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
Expand Down Expand Up @@ -2160,7 +2160,7 @@ public class MultiListenerSendTo {
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
Expand Down Expand Up @@ -2258,7 +2258,7 @@ public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] rep
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
Expand Down Expand Up @@ -4886,7 +4886,7 @@ The following example shows such a configuration:
[source, java]
----
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
----
Expand All @@ -4913,7 +4913,7 @@ static class MultiListenerBean {
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
Expand Down Expand Up @@ -4999,7 +4999,7 @@ public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
Expand All @@ -5018,7 +5018,7 @@ public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
Expand Down Expand Up @@ -5575,8 +5575,8 @@ For POJO batch listeners, starting with version 2.8.6, the header is copied into
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public interface KafkaOperations<K, V> {
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -479,7 +479,7 @@ private Message<?> checkHeaders(Object result, String topic, Object source) { //
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
boolean sourceIsMessage = source instanceof Message;
boolean needsCorrelation = headers.get(KafkaHeaders.CORRELATION_ID) == null && sourceIsMessage;
boolean needsPartition = headers.get(KafkaHeaders.PARTITION_ID) == null && sourceIsMessage
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
&& getReplyPartition((Message<?>) source) != null;
if (needsTopic || needsCorrelation || needsPartition) {
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
Expand Down Expand Up @@ -546,7 +546,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
private void setPartition(MessageBuilder<?> builder, Message<?> source) {
byte[] partitionBytes = getReplyPartition(source);
if (partitionBytes != null) {
builder.setHeader(KafkaHeaders.PARTITION_ID, ByteBuffer.wrap(partitionBytes).getInt());
builder.setHeader(KafkaHeaders.PARTITION, ByteBuffer.wrap(partitionBytes).getInt());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ public KeyValue<K, R> transform(K key, V value) {
headers.add(header);
}
});
Object key2 = message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
Object key2 = message.getHeaders().get(KafkaHeaders.KEY);
return new KeyValue(key2 == null ? key : key2, message.getPayload());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public AbstractKafkaHeaderMapper(String... patterns) {
this.matchers.add(new NeverMatchHeaderMatcher(
KafkaHeaders.ACKNOWLEDGMENT,
KafkaHeaders.CONSUMER,
KafkaHeaders.MESSAGE_KEY,
KafkaHeaders.KEY,
KafkaHeaders.OFFSET,
KafkaHeaders.PARTITION_ID,
KafkaHeaders.PARTITION,
KafkaHeaders.RAW_DATA,
KafkaHeaders.RECEIVED_MESSAGE_KEY,
KafkaHeaders.RECEIVED_PARTITION_ID,
KafkaHeaders.RECEIVED_KEY,
KafkaHeaders.RECEIVED_PARTITION,
KafkaHeaders.RECEIVED_TIMESTAMP,
KafkaHeaders.RECEIVED_TOPIC,
KafkaHeaders.TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ public abstract class KafkaHeaders {
public static final String TOPIC = PREFIX + "topic";

/**
* The header containing the message key when sending data to Kafka.
* The header containing the record key when sending data to Kafka.
* @since 2.9
*/
public static final String MESSAGE_KEY = PREFIX + "messageKey";
public static final String KEY = PREFIX + "messageKey";

/**
* The header containing the topic partition when sending data to Kafka.
* @since 2.0
*/
public static final String PARTITION_ID = PREFIX + "partitionId";
public static final String PARTITION = PREFIX + "partitionId";

/**
* The header for the partition offset.
Expand Down Expand Up @@ -83,17 +85,20 @@ public abstract class KafkaHeaders {
public static final String RECEIVED_TOPIC = RECEIVED + "Topic";

/**
* The header containing the message key for the received message.
* The header containing the record key from the received message.
* @since 2.9
*/
public static final String RECEIVED_MESSAGE_KEY = RECEIVED + "MessageKey";
public static final String RECEIVED_KEY = RECEIVED + "MessageKey";

/**
* The header containing the topic partition for the received message.
* The header containing the topic partition from the received message.
* @since 2.9
*/
public static final String RECEIVED_PARTITION_ID = RECEIVED + "PartitionId";
public static final String RECEIVED_PARTITION = RECEIVED + "PartitionId";

/**
* The header for holding the {@link org.apache.kafka.common.record.TimestampType type} of timestamp.
* The header for holding the {@link org.apache.kafka.common.record.TimestampType
* type} of timestamp.
*/
public static final String TIMESTAMP_TYPE = PREFIX + "timestampType";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,12 +61,12 @@ default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consume
@Nullable Object timestampType, Object timestamp) {

rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION, partition);
rawHeaders.put(KafkaHeaders.OFFSET, offset);
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType);
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp);
JavaUtils.INSTANCE
.acceptIfNotNull(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.RECEIVED_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(),
(key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -227,8 +227,8 @@ else if (topicHeader == null) {
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
+ topicHeader.getClass());
}
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
Integer partition = headers.get(KafkaHeaders.PARTITION, Integer.class);
Object key = headers.get(KafkaHeaders.KEY);
Object payload = convertPayload(message);
Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
Headers recordHeaders = initialRecordHeaders(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -266,7 +266,7 @@ public KafkaListenerContainerFactory<?> getContainerFactory() {
containerFactory = "#{__listener.containerFactory}")
// @SendTo("foo") test WARN log for void return
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions) {
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions) {
if (this.received == null) {
this.received = foos;
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public Collection<Message<?>> listen1(List<Foo> foos) {
}
return foos.stream().map(f -> MessageBuilder.withPayload(new Foo(f.getBar().toUpperCase()))
.setHeader(KafkaHeaders.TOPIC, "blc5")
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.build())
.collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit df4e0aa

Please sign in to comment.