Skip to content

Commit

Permalink
Merge pull request #23 from abes-esr/develop
Browse files Browse the repository at this point in the history
merge dev to test
  • Loading branch information
EryneKL authored Dec 20, 2023
2 parents 5ecbc45 + 91ee98a commit 74e0a9f
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 111 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.bad
*.properties
hibernate.cfg.xml
HELP.md
Expand Down Expand Up @@ -43,4 +44,4 @@ hs_err_pid*

## Fichier de propriétés de test contenant les identifiants / mdp
**/test/resources/application.properties
**/resources/log4j2-localhost.xml
**/resources/log4j2-localhost.xml
1 change: 0 additions & 1 deletion src/main/java/fr/abes/logskbart/LogsKbartApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
public class LogsKbartApplication {
Expand Down
33 changes: 7 additions & 26 deletions src/main/java/fr/abes/logskbart/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapAddress;

/* @Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;*/
@Value("${topic.groupid.source}")
private String groupId;


@Bean
public ConsumerFactory<String, String> consumerLogsFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "logskbart");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
Expand All @@ -44,26 +44,7 @@ public ConsumerFactory<String, String> consumerLogsFactory() {
}

@Bean
public ConsumerFactory<String, String> consumerKbartFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "lignesKbart");
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaKbartListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerKbartFactory());
/*factory.setBatchListener(true);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);*/
return factory;
public Map<String, Timestamp> lastTimeStampByFilename() {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/fr/abes/logskbart/dto/LigneLogDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
public class LigneLogDto {
private String level;
private String message;
private Integer nbLine;
}
3 changes: 0 additions & 3 deletions src/main/java/fr/abes/logskbart/dto/LogDto.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package fr.abes.logskbart.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@Getter
@Setter
@NoArgsConstructor
public class Kbart2KafkaDto {
public class LogKbartDto {
private InstantDto instant;
private String thread;
private String level;
Expand All @@ -17,6 +17,7 @@ public class Kbart2KafkaDto {
private String loggerFqcn;
private Integer threadId;
private Integer threadPriority;
private Integer nbLine;

@Getter @Setter
private static class InstantDto {
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/fr/abes/logskbart/entity/LogKbart.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import fr.abes.logskbart.utils.Level;
import jakarta.persistence.*;
import lombok.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.Date;
Expand All @@ -12,6 +15,7 @@
@Getter
@Setter
@NoArgsConstructor
@Slf4j
public class LogKbart implements Serializable {
@Id
@Column(name = "ID")
Expand Down Expand Up @@ -49,5 +53,24 @@ public class LogKbart implements Serializable {
@Column(name = "THREAD_PRIORITY")
private Integer threadPriority;

@Column(name = "NB_LINE")
private Integer nbLine;

@Override
public String toString() {
return "LogKbart{" +
"packageName='" + packageName + '\'' +
", timestamp=" + timestamp +
", thread='" + thread + '\'' +
", level=" + level +
", message='" + message + '\'' +
", loggerFqcn='" + loggerFqcn + '\'' +
", nbLine='" + nbLine + '\'' +

'}';
}

public void log(){
log.debug( this.level +" : " + this);
}
}
125 changes: 56 additions & 69 deletions src/main/java/fr/abes/logskbart/kafka/LogsListener.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,91 @@
package fr.abes.logskbart.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.logskbart.dto.Kbart2KafkaDto;
import fr.abes.logskbart.dto.LogKbartDto;
import fr.abes.logskbart.entity.LogKbart;
import fr.abes.logskbart.repository.LogKbartRepository;
import fr.abes.logskbart.utils.UtilsMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
@RequiredArgsConstructor
public class LogsListener {

@Value("${topic.name.source.error}")
private String topicErrorKbart;
private final ObjectMapper mapper;

@Value("${topic.name.source.info}")
private String topicInfoKbart;
private final UtilsMapper logsMapper;

@Value("${topic.name.source.endoftraitement}")
private String topicEndOfTraitement;
private final LogKbartRepository repository;

@Autowired
private ObjectMapper mapper;
private final Map<String, Timestamp> lastTimeStampByFilename;

@Autowired
private UtilsMapper logsMapper;
public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartRepository repository, Map<String, Timestamp> lastTimeStampByFilename) {
this.mapper = mapper;
this.logsMapper = logsMapper;
this.repository = repository;
this.lastTimeStampByFilename = lastTimeStampByFilename;
}

@Autowired
private LogKbartRepository repository;

/**
* Ecoute les topic de log d'erreurs et de fin de traitement bestPpn et génère un fichier err pour chaque fichier kbart
*
* @param message le message kafka
* @throws IOException exception levée
*/
@KafkaListener(topics = {"${topic.name.source.error}", "${topic.name.source.info}", "${topic.name.source.endoftraitement}"}, groupId = "logskbart", containerFactory = "kafkaLogsListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.error}", "${topic.name.source.info}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory")
public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, String> message) throws IOException {

if (message.topic().equals(topicErrorKbart) || message.topic().equals(topicInfoKbart)) {
Kbart2KafkaDto dto = mapper.readValue(message.value(), Kbart2KafkaDto.class);
LogKbart entity = logsMapper.map(dto, LogKbart.class);
Timestamp timestamp = new Timestamp(message.timestamp());
entity.setTimestamp(new Date(timestamp.getTime()));
entity.setPackageName(message.key().replaceAll("\\[line\\s:\\s\\d+\\]", ""));
LogKbartDto dto = mapper.readValue(message.value(), LogKbartDto.class);
LogKbart logKbart = logsMapper.map(dto, LogKbart.class);

String[] listMessage = message.key().split(";");
log.debug(Arrays.toString(listMessage));
// recuperation de l'heure a laquelle le message a ete envoye
Timestamp currentTimestamp = new Timestamp(message.timestamp());
logKbart.setTimestamp(new Date(currentTimestamp.getTime()));
logKbart.setPackageName(listMessage[0]);
String nbLineOrigine = (listMessage.length > 1) ? listMessage[1] : "";
logKbart.setNbLine(Integer.parseInt((nbLineOrigine.isEmpty() ? "-1" : nbLineOrigine) ));

logKbart.log();

// Vérifie qu'un fichier portant le nom du kbart en cours existe
if (!logKbart.getPackageName().contains("ctx:package") && !logKbart.getPackageName().contains("_FORCE")) {
Path tempPath = Path.of("tempLog");
if(!Files.exists(tempPath)) {
Files.createDirectory(tempPath);
}
Path of = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));

// Si la ligne de log sur le topic errorkbart2kafka est de type ERROR
if (entity.getLevel().toString().equals("ERROR")) {
String nbrLine = message.key().substring(message.key().indexOf(".tsv")+4).replaceAll("\\[line\\s:\\s", "").replaceAll("]", "");
String fileName = message.key().replaceAll(".tsv\\[line\\s:\\s\\d+\\]", ".bad");
String line = nbrLine + "\t" + dto.getMessage();
if (logKbart.getLevel().toString().equals("ERROR")) {
if (lastTimeStampByFilename.get(logKbart.getPackageName()) != null) {
Timestamp LastTimestampPlusTwoMinutes = new Timestamp(lastTimeStampByFilename.get(logKbart.getPackageName()).getTime() + TimeUnit.MINUTES.toMillis(2 ));

// Si ca fait 2min qu'on a pas recu de message pour ce fichier
if (currentTimestamp.after(LastTimestampPlusTwoMinutes)) {
log.debug("Suppression fichier " + logKbart.getPackageName() + " si existe");
Files.deleteIfExists(of);
}
}
lastTimeStampByFilename.put(logKbart.getPackageName(), currentTimestamp);

String line = nbLineOrigine + "\t" + logKbart.getMessage();

// Vérifie qu'un fichier portant le nom du kbart en cours existe
Path of = Path.of(fileName);
if (Files.exists(of)) {
// Inscrit la ligne dedans
Files.write(of, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
Expand All @@ -87,39 +104,9 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, Stri
}
}
}

// Inscrit l'entity en BDD
repository.save(entity);

} else if (message.topic().equals(topicEndOfTraitement)) { // Si la ligne sur le topic bestppn.endoftraitment contient OK

// Créer un nouveau Path avec le FileName (en remplaçant l'extension par .err)
Path source = null;
for (Header header : message.headers().toArray()) {
if (header.key().equals("FileName")) {
source = Path.of(new String(header.value()).replaceAll(".tsv", ".bad"));
break;
}
}

// Copie le fichier existant vers le répertoire temporaire en ajoutant sa date de création
if (source != null && Files.exists(source)) {
LocalDateTime time = LocalDateTime.now();
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss", Locale.FRANCE);
String date = format.format(time);

// Vérification du chemin et création si inexistant
String tempLog = "tempLog/";
File chemin = new File("tempLog/");
if (!chemin.isDirectory()) {
Files.createDirectory(Paths.get(tempLog));
}
Path target = Path.of("tempLog\\" + date + "_" + source);

// Déplacement du fichier
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.info("Fichier de log transféré dans le dossier temporaire.");
}
}

// Inscrit l'entity en BDD
repository.save(logKbart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
@Repository
@LogsBdConfiguration
public interface LogKbartRepository extends JpaRepository<LogKbart, Long> {
List<LogKbart> findAllByPackageNameAndTimestampBetween(String filename, Date debut, Date fin);
List<LogKbart> findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(String filename, Date debut, Date fin);
}
2 changes: 1 addition & 1 deletion src/main/java/fr/abes/logskbart/service/LogsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public List<LogKbart> getLogKbartForPackage(String packageName, Date date) {
Calendar dateFin = (Calendar) dateChargement.clone();
dateFin.add(Calendar.DAY_OF_MONTH, 1);
log.debug("packageName {}, Date début {}, Date fin {}", packageName, dateChargement.getTime(), dateFin.getTime());
return repository.findAllByPackageNameAndTimestampBetween(packageName, dateChargement.getTime(), dateFin.getTime());
return repository.findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(packageName, dateChargement.getTime(), dateFin.getTime());
}
}
10 changes: 6 additions & 4 deletions src/main/java/fr/abes/logskbart/utils/LogsMapper.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fr.abes.logskbart.utils;

import fr.abes.logskbart.dto.Kbart2KafkaDto;
import fr.abes.logskbart.dto.LigneLogDto;
import fr.abes.logskbart.dto.LogKbartDto;
import fr.abes.logskbart.entity.LogKbart;
import org.modelmapper.Converter;
import org.modelmapper.spi.MappingContext;
Expand All @@ -21,9 +21,9 @@ public LogsMapper(UtilsMapper mapper) {
*/
@Bean
public void converterInfoBaconDtoToLogKbart() {
Converter<Kbart2KafkaDto, LogKbart> myConverter = new Converter<Kbart2KafkaDto, LogKbart>() {
public LogKbart convert(MappingContext<Kbart2KafkaDto, LogKbart> context) {
Kbart2KafkaDto source = context.getSource();
Converter<LogKbartDto, LogKbart> myConverter = new Converter<LogKbartDto, LogKbart>() {
public LogKbart convert(MappingContext<LogKbartDto, LogKbart> context) {
LogKbartDto source = context.getSource();
LogKbart target = new LogKbart();
target.setLevel(Level.valueOf(source.getLevel()));
target.setMessage(source.getMessage());
Expand All @@ -33,6 +33,7 @@ public LogKbart convert(MappingContext<Kbart2KafkaDto, LogKbart> context) {
target.setThreadId(source.getThreadId());
target.setThreadPriority(source.getThreadPriority());
target.setLoggerName(source.getLoggerName());
target.setNbLine(source.getNbLine());
return target;
}
};
Expand All @@ -47,6 +48,7 @@ public LigneLogDto convert(MappingContext<LogKbart, LigneLogDto> context) {
LigneLogDto target = new LigneLogDto();
target.setLevel(source.getLevel().toString());
target.setMessage(source.getMessage());
target.setNbLine(source.getNbLine());
return target;
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ spring.sql.logsdb.init.mode=never
logging.config=classpath:log4j2-dev.xml
logging.level.root=INFO
logging.level.fr.abes=DEBUG

topic.groupid.source=logskbartConsumer
Loading

0 comments on commit 74e0a9f

Please sign in to comment.