Skip to content

Commit

Permalink
Added functionality to allow user to add a message to a topic partiti…
Browse files Browse the repository at this point in the history
…on manually (#703)
  • Loading branch information
nguyen-tri-nhan authored Dec 7, 2024
1 parent 0ed94d0 commit 5275899
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 14 deletions.
104 changes: 100 additions & 4 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,28 @@
import kafdrop.service.MessageInspector;
import kafdrop.service.TopicNotFoundException;
import kafdrop.util.AvroMessageDeserializer;
import kafdrop.util.AvroMessageSerializer;
import kafdrop.util.DefaultMessageDeserializer;
import kafdrop.util.DefaultMessageSerializer;
import kafdrop.util.Deserializers;
import kafdrop.util.KeyFormat;
import kafdrop.util.MessageDeserializer;
import kafdrop.util.MessageFormat;
import kafdrop.util.MessageSerializer;
import kafdrop.util.MsgPackMessageDeserializer;
import kafdrop.util.MsgPackMessageSerializer;
import kafdrop.util.ProtobufMessageDeserializer;
import kafdrop.util.ProtobufMessageSerializer;
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


import kafdrop.util.Serializers;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
Expand All @@ -57,11 +71,10 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

import org.springframework.web.bind.annotation.PostMapping;
import kafdrop.model.CreateMessageVO;

@Tag(name = "message-controller", description = "Message Controller")
@Controller
Expand Down Expand Up @@ -195,6 +208,60 @@ public String viewMessageForm(@PathVariable("name") String topicName,
return "message-inspector";
}

@PostMapping("/topic/{name:.+}/addmessage")
public String addMessage(
@PathVariable("name")
String topicName,
@ModelAttribute("addMessageForm") CreateMessageVO body,
Model model) {
try {
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();

final var serializers = new Serializers(
getSerializer(topicName, defaultKeyFormat, "", ""),
getSerializer(topicName, defaultFormat, "", ""));
RecordMetadata recordMetadata = kafkaMonitor.publishMessage(body, serializers);

final var deserializers = new Deserializers(
getDeserializer(topicName, defaultKeyFormat, "", ""),
getDeserializer(topicName, defaultFormat, "", "")
);

final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo();

defaultForm.setCount(100l);
defaultForm.setOffset(recordMetadata.offset());
defaultForm.setPartition(body.getTopicPartition());
defaultForm.setFormat(defaultFormat);
defaultForm.setKeyFormat(defaultFormat);

model.addAttribute("messageForm", defaultForm);

final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);

model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());
model.addAttribute("messages",
messageInspector.getMessages(topicName,
body.getTopicPartition(),
recordMetadata.offset(),
100,
deserializers));
model.addAttribute("isAnyProtoOpts", List.of(true, false));

} catch (Exception ex) {
model.addAttribute("errorMessage", ex.getMessage());
}
return "message-inspector";
}

/**
* Human friendly view of searching messages.
*
Expand Down Expand Up @@ -339,6 +406,11 @@ List<Object> getPartitionOrMessages(
}
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
String msgTypeName) {
return getDeserializer(topicName, format, descFile, msgTypeName, false);
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
String msgTypeName, boolean isAnyProto) {
final MessageDeserializer deserializer;
Expand Down Expand Up @@ -370,6 +442,30 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
return deserializer;
}

private MessageSerializer getSerializer(String topicName, MessageFormat format, String descFile, String msgTypeName) {
final MessageSerializer serializer;

if (format == MessageFormat.AVRO) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();

serializer = new AvroMessageSerializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
} else if (format == MessageFormat.PROTOBUF) {
// filter the input file name
final var descFileName = descFile.replace(".desc", "")
.replaceAll("\\.", "")
.replaceAll("/", "");
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc";
serializer = new ProtobufMessageSerializer(fullDescFile, msgTypeName);
} else if (format == MessageFormat.MSGPACK) {
serializer = new MsgPackMessageSerializer();
} else {
serializer = new DefaultMessageSerializer();
}

return serializer;
}

/**
* Encapsulates offset data for a single partition.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/kafdrop/model/CreateMessageVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kafdrop.model;

import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@RequiredArgsConstructor
public final class CreateMessageVO {

private int topicPartition;

private String key;

private String value;

private String topic;
}
7 changes: 3 additions & 4 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,9 @@ synchronized SearchResults searchRecords(String topic,
List<TopicPartition> partitions = determinePartitionsForTopic(topic);
if (partition != -1) {
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny();
if (partitionOpt.isEmpty()) {
throw new IllegalArgumentException("Partition does not exist in topic");
}
partitions = List.of(partitionOpt.get());
partitions = List.of(partitionOpt.orElseThrow(
() -> new IllegalArgumentException("Partition " + partition + " does not exist in topic")
));
}
kafkaConsumer.assign(partitions);
seekToTimestamp(partitions, startTimestamp);
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/kafdrop/service/KafkaHighLevelProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kafdrop.service;

import java.util.Properties;
import java.util.concurrent.Future;


import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import kafdrop.config.KafkaConfiguration;
import kafdrop.model.CreateMessageVO;
import kafdrop.util.Serializers;

@Service
public final class KafkaHighLevelProducer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelProducer.class);
private final KafkaConfiguration kafkaConfiguration;
private KafkaProducer<byte[], byte[]> kafkaProducer;

public KafkaHighLevelProducer(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}

@PostConstruct
private void initializeClient() {
if (kafkaProducer == null) {
final var properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafdrop-producer");
kafkaConfiguration.applyCommon(properties);

kafkaProducer = new KafkaProducer<>(properties);
}
}

public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
initializeClient();

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(message.getTopic(),
message.getTopicPartition(), serializers.getKeySerializer().serializeMessage(message.getKey()),
serializers.getValueSerializer().serializeMessage(message.getValue()));

Future<RecordMetadata> result = kafkaProducer.send(record);
try {
RecordMetadata recordMetadata = result.get();
LOG.info("Record published successfully [{}]", recordMetadata);
return recordMetadata;
} catch (Exception e) {
LOG.error("Failed to publish message", e);
throw new KafkaProducerException(e);
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import kafdrop.model.BrokerVO;
import kafdrop.model.ClusterSummaryVO;
import kafdrop.model.ConsumerVO;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.CreateTopicVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicVO;
import kafdrop.util.Deserializers;
import kafdrop.util.Serializers;
import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -79,5 +83,7 @@ SearchResultsVO searchMessages(String topic,
*/
void deleteTopic(String topic);

RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers);

List<AclVO> getAcls();
}
23 changes: 18 additions & 5 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,40 @@
import kafdrop.model.AclVO;
import kafdrop.model.BrokerVO;
import kafdrop.model.ClusterSummaryVO;
import kafdrop.model.ConsumerPartitionVO;
import kafdrop.model.ConsumerTopicVO;
import kafdrop.model.ConsumerVO;
import kafdrop.model.ConsumerTopicVO;
import kafdrop.model.ConsumerPartitionVO;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.CreateTopicVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.util.Serializers;
import kafdrop.util.Deserializers;
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;


import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -64,9 +68,13 @@ public final class KafkaMonitorImpl implements KafkaMonitor {

private final KafkaHighLevelAdminClient highLevelAdminClient;

public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient) {
private final KafkaHighLevelProducer highLevelProducer;

public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient,
KafkaHighLevelProducer highLevelProducer) {
this.highLevelConsumer = highLevelConsumer;
this.highLevelAdminClient = highLevelAdminClient;
this.highLevelProducer = highLevelProducer;
}

@Override
Expand Down Expand Up @@ -419,4 +427,9 @@ private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
.filter(not(ConsumerGroupOffsets::isEmpty))
.collect(Collectors.toList());
}

@Override
public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
return highLevelProducer.publishMessage(message, serializers);
}
}
16 changes: 16 additions & 0 deletions src/main/java/kafdrop/service/KafkaProducerException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kafdrop.service;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;

@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class KafkaProducerException extends RuntimeException {

public KafkaProducerException(Throwable exception) {
super(exception);
}

public KafkaProducerException(String message) {
super(message);
}
}
36 changes: 36 additions & 0 deletions src/main/java/kafdrop/util/AvroMessageSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kafdrop.util;

import java.util.HashMap;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class AvroMessageSerializer implements MessageSerializer {

private final String topicName;
private final KafkaAvroSerializer serializer;

public AvroMessageSerializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
this.topicName = topicName;
this.serializer = getSerializer(schemaRegistryUrl, schemaRegistryAuth);
}

@Override
public byte[] serializeMessage(String value) {
final var bytes = value.getBytes();
return serializer.serialize(topicName, bytes);
}

private KafkaAvroSerializer getSerializer(String schemaRegistryUrl, String schemaRegistryAuth) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
}
final var kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(config, false);
return kafkaAvroSerializer;
}

}
Loading

0 comments on commit 5275899

Please sign in to comment.