diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index a4a825be..d3542f7d 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -39,14 +39,28 @@ import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; import kafdrop.util.AvroMessageDeserializer; +import kafdrop.util.AvroMessageSerializer; import kafdrop.util.DefaultMessageDeserializer; +import kafdrop.util.DefaultMessageSerializer; import kafdrop.util.Deserializers; import kafdrop.util.KeyFormat; import kafdrop.util.MessageDeserializer; import kafdrop.util.MessageFormat; +import kafdrop.util.MessageSerializer; import kafdrop.util.MsgPackMessageDeserializer; +import kafdrop.util.MsgPackMessageSerializer; import kafdrop.util.ProtobufMessageDeserializer; +import kafdrop.util.ProtobufMessageSerializer; import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + + +import kafdrop.util.Serializers; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; @@ -57,11 +71,10 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; -import java.io.File; -import java.util.ArrayList; -import java.util.Comparator; import java.util.Date; -import java.util.List; + +import org.springframework.web.bind.annotation.PostMapping; +import kafdrop.model.CreateMessageVO; @Tag(name = "message-controller", description = "Message Controller") @Controller @@ -195,6 +208,60 @@ public String viewMessageForm(@PathVariable("name") String topicName, return "message-inspector"; } + @PostMapping("/topic/{name:.+}/addmessage") + public String addMessage( + @PathVariable("name") + String topicName, + @ModelAttribute("addMessageForm") CreateMessageVO body, + Model model) { + try { + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat(); + + final var serializers = new Serializers( + getSerializer(topicName, defaultKeyFormat, "", ""), + getSerializer(topicName, defaultFormat, "", "")); + RecordMetadata recordMetadata = kafkaMonitor.publishMessage(body, serializers); + + final var deserializers = new Deserializers( + getDeserializer(topicName, defaultKeyFormat, "", ""), + getDeserializer(topicName, defaultFormat, "", "") + ); + + final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); + + defaultForm.setCount(100l); + defaultForm.setOffset(recordMetadata.offset()); + defaultForm.setPartition(body.getTopicPartition()); + defaultForm.setFormat(defaultFormat); + defaultForm.setKeyFormat(defaultFormat); + + model.addAttribute("messageForm", defaultForm); + + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + model.addAttribute("topic", topic); + + model.addAttribute("defaultFormat", defaultFormat); + model.addAttribute("messageFormats", MessageFormat.values()); + model.addAttribute("defaultKeyFormat", defaultKeyFormat); + model.addAttribute("keyFormats", KeyFormat.values()); + model.addAttribute("descFiles", protobufProperties.getDescFilesList()); + model.addAttribute("messages", + messageInspector.getMessages(topicName, + body.getTopicPartition(), + recordMetadata.offset(), + 100, + deserializers)); + model.addAttribute("isAnyProtoOpts", List.of(true, false)); + + } catch (Exception ex) { + model.addAttribute("errorMessage", ex.getMessage()); + } + return "message-inspector"; + } + /** * Human friendly view of searching messages. * @@ -339,6 +406,11 @@ List getPartitionOrMessages( } } + private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile, + String msgTypeName) { + return getDeserializer(topicName, format, descFile, msgTypeName, false); + } + private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile, String msgTypeName, boolean isAnyProto) { final MessageDeserializer deserializer; @@ -370,6 +442,30 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form return deserializer; } + private MessageSerializer getSerializer(String topicName, MessageFormat format, String descFile, String msgTypeName) { + final MessageSerializer serializer; + + if (format == MessageFormat.AVRO) { + final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); + final var schemaRegistryAuth = schemaRegistryProperties.getAuth(); + + serializer = new AvroMessageSerializer(topicName, schemaRegistryUrl, schemaRegistryAuth); + } else if (format == MessageFormat.PROTOBUF) { + // filter the input file name + final var descFileName = descFile.replace(".desc", "") + .replaceAll("\\.", "") + .replaceAll("/", ""); + final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc"; + serializer = new ProtobufMessageSerializer(fullDescFile, msgTypeName); + } else if (format == MessageFormat.MSGPACK) { + serializer = new MsgPackMessageSerializer(); + } else { + serializer = new DefaultMessageSerializer(); + } + + return serializer; + } + /** * Encapsulates offset data for a single partition. */ diff --git a/src/main/java/kafdrop/model/CreateMessageVO.java b/src/main/java/kafdrop/model/CreateMessageVO.java new file mode 100644 index 00000000..bf9d7211 --- /dev/null +++ b/src/main/java/kafdrop/model/CreateMessageVO.java @@ -0,0 +1,17 @@ +package kafdrop.model; + +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@Data +@RequiredArgsConstructor +public final class CreateMessageVO { + + private int topicPartition; + + private String key; + + private String value; + + private String topic; +} diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 26300d26..f9697553 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -212,10 +212,9 @@ synchronized SearchResults searchRecords(String topic, List partitions = determinePartitionsForTopic(topic); if (partition != -1) { var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny(); - if (partitionOpt.isEmpty()) { - throw new IllegalArgumentException("Partition does not exist in topic"); - } - partitions = List.of(partitionOpt.get()); + partitions = List.of(partitionOpt.orElseThrow( + () -> new IllegalArgumentException("Partition " + partition + " does not exist in topic") + )); } kafkaConsumer.assign(partitions); seekToTimestamp(partitions, startTimestamp); diff --git a/src/main/java/kafdrop/service/KafkaHighLevelProducer.java b/src/main/java/kafdrop/service/KafkaHighLevelProducer.java new file mode 100644 index 00000000..b6fdeeb6 --- /dev/null +++ b/src/main/java/kafdrop/service/KafkaHighLevelProducer.java @@ -0,0 +1,65 @@ +package kafdrop.service; + +import java.util.Properties; +import java.util.concurrent.Future; + + +import jakarta.annotation.PostConstruct; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import kafdrop.config.KafkaConfiguration; +import kafdrop.model.CreateMessageVO; +import kafdrop.util.Serializers; + +@Service +public final class KafkaHighLevelProducer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelProducer.class); + private final KafkaConfiguration kafkaConfiguration; + private KafkaProducer kafkaProducer; + + public KafkaHighLevelProducer(KafkaConfiguration kafkaConfiguration) { + this.kafkaConfiguration = kafkaConfiguration; + } + + @PostConstruct + private void initializeClient() { + if (kafkaProducer == null) { + final var properties = new Properties(); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.ACKS_CONFIG, "all"); + properties.put(ProducerConfig.RETRIES_CONFIG, 0); + properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); + properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafdrop-producer"); + kafkaConfiguration.applyCommon(properties); + + kafkaProducer = new KafkaProducer<>(properties); + } + } + + public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) { + initializeClient(); + + final ProducerRecord record = new ProducerRecord(message.getTopic(), + message.getTopicPartition(), serializers.getKeySerializer().serializeMessage(message.getKey()), + serializers.getValueSerializer().serializeMessage(message.getValue())); + + Future result = kafkaProducer.send(record); + try { + RecordMetadata recordMetadata = result.get(); + LOG.info("Record published successfully [{}]", recordMetadata); + return recordMetadata; + } catch (Exception e) { + LOG.error("Failed to publish message", e); + throw new KafkaProducerException(e); + } + } +} diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d7f66f7c..ddb97282 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -22,13 +22,17 @@ import kafdrop.model.BrokerVO; import kafdrop.model.ClusterSummaryVO; import kafdrop.model.ConsumerVO; +import kafdrop.model.CreateMessageVO; import kafdrop.model.CreateTopicVO; import kafdrop.model.MessageVO; import kafdrop.model.SearchResultsVO; import kafdrop.model.TopicVO; import kafdrop.util.Deserializers; +import kafdrop.util.Serializers; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.producer.RecordMetadata; + import java.util.Collection; import java.util.Date; import java.util.List; @@ -79,5 +83,7 @@ SearchResultsVO searchMessages(String topic, */ void deleteTopic(String topic); + RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers); + List getAcls(); } diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 790052a6..d5fabfe6 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -21,18 +21,21 @@ import kafdrop.model.AclVO; import kafdrop.model.BrokerVO; import kafdrop.model.ClusterSummaryVO; -import kafdrop.model.ConsumerPartitionVO; -import kafdrop.model.ConsumerTopicVO; import kafdrop.model.ConsumerVO; +import kafdrop.model.ConsumerTopicVO; +import kafdrop.model.ConsumerPartitionVO; +import kafdrop.model.CreateMessageVO; import kafdrop.model.CreateTopicVO; import kafdrop.model.MessageVO; import kafdrop.model.SearchResultsVO; -import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; +import kafdrop.model.TopicPartitionVO; +import kafdrop.util.Serializers; import kafdrop.util.Deserializers; import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; @@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -47,10 +51,10 @@ import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.Map.Entry; import java.util.function.Function; import java.util.stream.Collectors; @@ -64,9 +68,13 @@ public final class KafkaMonitorImpl implements KafkaMonitor { private final KafkaHighLevelAdminClient highLevelAdminClient; - public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient) { + private final KafkaHighLevelProducer highLevelProducer; + + public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient, + KafkaHighLevelProducer highLevelProducer) { this.highLevelConsumer = highLevelConsumer; this.highLevelAdminClient = highLevelAdminClient; + this.highLevelProducer = highLevelProducer; } @Override @@ -419,4 +427,9 @@ private List getConsumerOffsets(Set topics) { .filter(not(ConsumerGroupOffsets::isEmpty)) .collect(Collectors.toList()); } + + @Override + public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) { + return highLevelProducer.publishMessage(message, serializers); + } } diff --git a/src/main/java/kafdrop/service/KafkaProducerException.java b/src/main/java/kafdrop/service/KafkaProducerException.java new file mode 100644 index 00000000..6e38bac1 --- /dev/null +++ b/src/main/java/kafdrop/service/KafkaProducerException.java @@ -0,0 +1,16 @@ +package kafdrop.service; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) +public class KafkaProducerException extends RuntimeException { + + public KafkaProducerException(Throwable exception) { + super(exception); + } + + public KafkaProducerException(String message) { + super(message); + } +} diff --git a/src/main/java/kafdrop/util/AvroMessageSerializer.java b/src/main/java/kafdrop/util/AvroMessageSerializer.java new file mode 100644 index 00000000..a41bd97b --- /dev/null +++ b/src/main/java/kafdrop/util/AvroMessageSerializer.java @@ -0,0 +1,36 @@ +package kafdrop.util; + +import java.util.HashMap; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +public class AvroMessageSerializer implements MessageSerializer { + + private final String topicName; + private final KafkaAvroSerializer serializer; + + public AvroMessageSerializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { + this.topicName = topicName; + this.serializer = getSerializer(schemaRegistryUrl, schemaRegistryAuth); + } + + @Override + public byte[] serializeMessage(String value) { + final var bytes = value.getBytes(); + return serializer.serialize(topicName, bytes); + } + + private KafkaAvroSerializer getSerializer(String schemaRegistryUrl, String schemaRegistryAuth) { + final var config = new HashMap(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + if (schemaRegistryAuth != null) { + config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); + } + final var kafkaAvroSerializer = new KafkaAvroSerializer(); + kafkaAvroSerializer.configure(config, false); + return kafkaAvroSerializer; + } + +} diff --git a/src/main/java/kafdrop/util/DefaultMessageSerializer.java b/src/main/java/kafdrop/util/DefaultMessageSerializer.java new file mode 100644 index 00000000..4c209106 --- /dev/null +++ b/src/main/java/kafdrop/util/DefaultMessageSerializer.java @@ -0,0 +1,10 @@ +package kafdrop.util; + +public class DefaultMessageSerializer implements MessageSerializer { + + @Override + public byte[] serializeMessage(String value) { + return value.getBytes(); + } + +} diff --git a/src/main/java/kafdrop/util/MessageSerializer.java b/src/main/java/kafdrop/util/MessageSerializer.java new file mode 100644 index 00000000..6c77915e --- /dev/null +++ b/src/main/java/kafdrop/util/MessageSerializer.java @@ -0,0 +1,6 @@ +package kafdrop.util; + +@FunctionalInterface +public interface MessageSerializer { + byte[] serializeMessage(String value); +} diff --git a/src/main/java/kafdrop/util/MsgPackMessageSerializer.java b/src/main/java/kafdrop/util/MsgPackMessageSerializer.java new file mode 100644 index 00000000..a39543b5 --- /dev/null +++ b/src/main/java/kafdrop/util/MsgPackMessageSerializer.java @@ -0,0 +1,26 @@ +package kafdrop.util; + +import java.io.IOException; + +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MsgPackMessageSerializer implements MessageSerializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MsgPackMessageSerializer.class); + + @Override + public byte[] serializeMessage(String value) { + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + packer.packString(value); + return packer.toByteArray(); + } catch (IOException e) { + final String errorMsg = "Unable to pack msgpack message"; + LOGGER.error(errorMsg, e); + throw new DeserializationException(errorMsg); + } + } + +} diff --git a/src/main/java/kafdrop/util/ProtobufMessageSerializer.java b/src/main/java/kafdrop/util/ProtobufMessageSerializer.java new file mode 100644 index 00000000..12a26449 --- /dev/null +++ b/src/main/java/kafdrop/util/ProtobufMessageSerializer.java @@ -0,0 +1,72 @@ +package kafdrop.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.DescriptorProtos.FileDescriptorProto; +import com.google.protobuf.DescriptorProtos.FileDescriptorSet; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Descriptors.FileDescriptor; + +public class ProtobufMessageSerializer implements MessageSerializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufMessageSerializer.class); + private final String fullDescFile; + private final String msgTypeName; + + public ProtobufMessageSerializer(String fullDescFile, String msgTypeName) { + this.fullDescFile = fullDescFile; + this.msgTypeName = msgTypeName; + } + + @Override + public byte[] serializeMessage(String value) { + try (InputStream input = new FileInputStream(new File(fullDescFile))) { + FileDescriptorSet set = FileDescriptorSet.parseFrom(input); + + List fileDescriptors = new ArrayList<>(); + for (FileDescriptorProto ffdp : set.getFileList()) { + FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(ffdp, + fileDescriptors.toArray(new FileDescriptor[fileDescriptors.size()])); + fileDescriptors.add(fileDescriptor); + } + + final var descriptors = + fileDescriptors.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList()); + + final var messageDescriptor = descriptors.stream().filter(desc -> msgTypeName.equals(desc.getName())).findFirst(); + if (messageDescriptor.isEmpty()) { + final String errorMsg = String.format("Can't find specific message type: %s", msgTypeName); + LOGGER.error(errorMsg); + throw new SerializationException(errorMsg); + } + + return DynamicMessage.parseFrom(messageDescriptor.get(), value.getBytes()).toByteArray(); + + } catch (FileNotFoundException e) { + final String errorMsg = String.format("Couldn't open descriptor file: %s", fullDescFile); + LOGGER.error(errorMsg, e); + throw new SerializationException(errorMsg); + } catch (IOException e) { + final String errorMsg = "Can't decode Protobuf message"; + LOGGER.error(errorMsg, e); + throw new SerializationException(errorMsg); + } catch (DescriptorValidationException e) { + final String errorMsg = String.format("Can't compile proto message type: %s", msgTypeName); + LOGGER.error(errorMsg, e); + throw new SerializationException(errorMsg); + } + } + +} diff --git a/src/main/java/kafdrop/util/SerializationException.java b/src/main/java/kafdrop/util/SerializationException.java new file mode 100644 index 00000000..ac56b3dc --- /dev/null +++ b/src/main/java/kafdrop/util/SerializationException.java @@ -0,0 +1,14 @@ +package kafdrop.util; + +public class SerializationException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public SerializationException(String msg) { + super(msg); + } + +} diff --git a/src/main/java/kafdrop/util/Serializers.java b/src/main/java/kafdrop/util/Serializers.java new file mode 100644 index 00000000..7fb1cced --- /dev/null +++ b/src/main/java/kafdrop/util/Serializers.java @@ -0,0 +1,21 @@ +package kafdrop.util; + +public class Serializers { + + private final MessageSerializer keySerializer; + private final MessageSerializer valueSerializer; + + public Serializers(MessageSerializer keySerializer, MessageSerializer valueSerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + } + + public MessageSerializer getKeySerializer() { + return keySerializer; + } + + public MessageSerializer getValueSerializer() { + return valueSerializer; + } + +} diff --git a/src/main/resources/templates/message-inspector.ftlh b/src/main/resources/templates/message-inspector.ftlh index 0319786a..c97576bf 100644 --- a/src/main/resources/templates/message-inspector.ftlh +++ b/src/main/resources/templates/message-inspector.ftlh @@ -29,6 +29,11 @@ #partitionSizes { margin-left: 16px; } + + .top-nav-buttons{ + margin-left: 16px; + margin-top: 16px; + } .badge { margin-right: 5px; @@ -84,6 +89,54 @@ id="partitionSize">${curPartition.size - curPartition.firstOffset} + + + + + +
@@ -162,6 +215,7 @@
   +
@@ -217,4 +271,4 @@ -<@template.footer/> \ No newline at end of file +<@template.footer/>