bin/kafka-topics.sh --create --topic test-producer --bootstrap-server 68.183.95.195:9092
bin/kafka-topics.sh --list --bootstrap-server 68.183.95.195:9092
bin/kafka-topics.sh --describe --topic test-producer --bootstrap-server 68.183.95.195:9092
bin/kafka-console-producer.sh --topic test-producer --bootstrap-server 68.183.95.195:9092
bin/kafka-console-consumer.sh --topic test-producer --from-beginning --bootstrap-server 68.183.95.195:9092
bin/kafka-topics.sh --delete --topic test-producer --bootstrap-server 68.183.95.195:9092
- Add the Application Configuration
public final static String applicationID = "Producer-Demo";
public final static String bootstrapServers = "broker-1:9092,broker-2:9092,broker-3:9092";
public final static String topicName = "test-producer";
public final static int numEvents = 1000;
- Configure the Producer with Producer Configuration
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfig.applicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- Create an instance of
KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
- Send messages to broker
for (int i = 1; i <= AppConfig.numEvents; i++) {
message = "Message-" + i;
producer.send(new ProducerRecord<>(AppConfig.topicName, i, message));
logger.info("Successfully sent message to Broker:: "+ message);
}
- Close the Producer instance
producer.close();