From 34b03efd680559e49d9e0a48e7c223ea09a03f67 Mon Sep 17 00:00:00 2001 From: le-maire Date: Tue, 30 Jan 2024 11:15:14 +0100 Subject: [PATCH] Ajout du job d'ajout des personnes --- .../configuration/BatchConfiguration.java | 58 ++++-- .../AjouterThesesPersonnesProcessor.java | 178 ++++++++++++++++++ .../SupprimerThesesPersonneProcessor.java | 10 +- .../utils/MappingJobName.java | 2 + .../utils/PersonneCacheUtils.java | 15 ++ 5 files changed, 239 insertions(+), 24 deletions(-) create mode 100644 src/main/java/fr/abes/theses_batch_indexation/processor/AjouterThesesPersonnesProcessor.java diff --git a/src/main/java/fr/abes/theses_batch_indexation/configuration/BatchConfiguration.java b/src/main/java/fr/abes/theses_batch_indexation/configuration/BatchConfiguration.java index 1730d04..67e34bc 100644 --- a/src/main/java/fr/abes/theses_batch_indexation/configuration/BatchConfiguration.java +++ b/src/main/java/fr/abes/theses_batch_indexation/configuration/BatchConfiguration.java @@ -95,11 +95,11 @@ public Job jobIndexationPersonnesDeBddVersES(Tasklet initialiserIndexESTasklet, @Bean public Job jobIndexationRecherchePersonnesDansES(Step stepIndexRecherchePersonnesDansBDD, - Tasklet initialiserIndexESTasklet, - Tasklet initiliserIndexBDDTasklet, - Tasklet indexerPersonnesDansESTasklet, - Tasklet chargerOaiSetsTasklet, - JobTheseCompletionNotificationListener listener) { + Tasklet initialiserIndexESTasklet, + Tasklet initiliserIndexBDDTasklet, + Tasklet indexerPersonnesDansESTasklet, + Tasklet chargerOaiSetsTasklet, + JobTheseCompletionNotificationListener listener) { return jobs.get("indexationRecherchePersonnesDansES").incrementer(new RunIdIncrementer()) .listener(listener) .start(stepInitiliserIndexBDDTasklet(initiliserIndexBDDTasklet)) @@ -112,9 +112,9 @@ public Job jobIndexationRecherchePersonnesDansES(Step stepIndexRecherchePersonne @Bean public Job jobIndexationThematiquesDansES(Step stepIndexThematiquesDansES, - JobRepository jobRepository, - Tasklet initialiserIndexESTasklet, - JobTheseCompletionNotificationListener listener) { + JobRepository jobRepository, + Tasklet initialiserIndexESTasklet, + JobTheseCompletionNotificationListener listener) { log.debug("debut du job indexation des thematiques dans ES..."); return jobs.get("indexationThematiquesDansES").repository(jobRepository).incrementer(new RunIdIncrementer()) @@ -128,8 +128,8 @@ public Job jobIndexationThematiquesDansES(Step stepIndexThematiquesDansES, @Bean public Job jobSuppressionThesesDansES(Step stepSupprimeThesesOuThematiquesDansES, - JobRepository jobRepository, - JobTheseCompletionNotificationListener listener) { + JobRepository jobRepository, + JobTheseCompletionNotificationListener listener) { log.debug("debut du job de suppression des theses dans ES..."); return jobs.get("suppressionThesesDansES").repository(jobRepository).incrementer(new RunIdIncrementer()) @@ -140,8 +140,8 @@ public Job jobSuppressionThesesDansES(Step stepSupprimeThesesOuThematiquesDansES @Bean public Job jobSuppressionThematiquesDansES(Step stepSupprimeThesesOuThematiquesDansES, - JobRepository jobRepository, - JobTheseCompletionNotificationListener listener) { + JobRepository jobRepository, + JobTheseCompletionNotificationListener listener) { log.debug("debut du job de suppression des thématiques dans ES..."); return jobs.get("suppressionThematiquesDansES").repository(jobRepository).incrementer(new RunIdIncrementer()) @@ -168,6 +168,21 @@ public Job jobSuppressionPersonnesDansES(Step stepSupprimePersonnesDansES, .build(); } + @Bean + public Job jobAjoutPersonnesDansES(Step stepAjouterPersonnesDansES, + JobRepository jobRepository, + Tasklet initiliserIndexBDDTasklet, + Tasklet indexerPersonnesDansESTasklet, + Tasklet chargerOaiSetsTasklet, + JobTheseCompletionNotificationListener listener) { + return jobs.get("ajoutPersonnesDansES").repository(jobRepository).incrementer(new RunIdIncrementer()) + .listener(listener) + .start(stepInitiliserIndexBDDTasklet(initiliserIndexBDDTasklet)) + .next(stepChargerListeOaiSets(chargerOaiSetsTasklet)) + .next(stepAjouterPersonnesDansES) + .next(stepIndexerPersonnesDansESTasklet(indexerPersonnesDansESTasklet)) + .build(); + } // ---------- STEP -------------------------------------------- @@ -198,6 +213,7 @@ public Step stepIndexThematiquesDansES(@Qualifier("jdbcPagingCustomReader") Jdbc .throttleLimit(config.getThrottle()) .build(); } + @Bean public Step stepIndexPersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcPagingCustomReader itemReader, @Qualifier("personneItemProcessor") ItemProcessor itemProcessor, @@ -208,6 +224,7 @@ public Step stepIndexPersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcP .writer(itemWriter) .build(); } + @Bean public Step stepIndexRecherchePersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcPagingCustomReader itemReader, @Qualifier("recherchePersonneItemProcessor") ItemProcessor itemProcessor, @@ -248,7 +265,7 @@ public Step stepChargerListeOaiSets(@Qualifier("chargerOaiSetsTasklet") Tasklet @Bean public Step stepSupprimeThesesOuThematiquesDansES(@Qualifier("jdbcPagingDeleteReader") JdbcPagingDeleteReader itemReader, - @Qualifier("ESDeleteWriter") ItemWriter itemWriter) { + @Qualifier("ESDeleteWriter") ItemWriter itemWriter) { return stepBuilderFactory.get("stepSuppressionThese").chunk(config.getChunk()) .listener(theseWriteListener) .reader(itemReader) @@ -260,8 +277,19 @@ public Step stepSupprimeThesesOuThematiquesDansES(@Qualifier("jdbcPagingDeleteRe @Bean public Step stepSupprimePersonnesDansES(@Qualifier("jdbcPagingDeleteReader") JdbcPagingDeleteReader itemReader, - @Qualifier("supprimerThesesPersonneProcessor") ItemProcessor itemProcessor) { - return stepBuilderFactory.get("stepSupprimePersonnes").chunk(config.getChunk()) + @Qualifier("supprimerThesesPersonneProcessor") ItemProcessor itemProcessor) { + return stepBuilderFactory.get("stepSupprimePersonnesDansES").chunk(1) + .listener(theseWriteListener) + .reader(itemReader) + .processor(itemProcessor) + .taskExecutor(taskExecutor()) + .build(); + } + + @Bean + public Step stepAjouterPersonnesDansES(JdbcPagingCustomReader itemReader, + @Qualifier("ajouterThesesPersonnesProcessor") ItemProcessor itemProcessor) { + return stepBuilderFactory.get("stepAjouterPersonnesDansES").chunk(1) .listener(theseWriteListener) .reader(itemReader) .processor(itemProcessor) diff --git a/src/main/java/fr/abes/theses_batch_indexation/processor/AjouterThesesPersonnesProcessor.java b/src/main/java/fr/abes/theses_batch_indexation/processor/AjouterThesesPersonnesProcessor.java new file mode 100644 index 0000000..0b7bb88 --- /dev/null +++ b/src/main/java/fr/abes/theses_batch_indexation/processor/AjouterThesesPersonnesProcessor.java @@ -0,0 +1,178 @@ +package fr.abes.theses_batch_indexation.processor; + +import co.elastic.clients.elasticsearch._types.query_dsl.Query; +import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; +import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import fr.abes.theses_batch_indexation.configuration.ElasticClient; +import fr.abes.theses_batch_indexation.database.TheseModel; +import fr.abes.theses_batch_indexation.dto.personne.PersonneMapee; +import fr.abes.theses_batch_indexation.dto.personne.PersonneModelES; +import fr.abes.theses_batch_indexation.dto.personne.PersonneModelESAvecId; +import fr.abes.theses_batch_indexation.model.oaisets.Set; +import fr.abes.theses_batch_indexation.model.tef.Mets; +import fr.abes.theses_batch_indexation.utils.MappingJobName; +import fr.abes.theses_batch_indexation.utils.PersonneCacheUtils; +import fr.abes.theses_batch_indexation.utils.XMLJsonMarshalling; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Component +public class AjouterThesesPersonnesProcessor implements ItemProcessor, StepExecutionListener { + MappingJobName mappingJobName = new MappingJobName(); + private final XMLJsonMarshalling marshall; + String nomIndex; + + @Value("${table.personne.name}") + private String tablePersonneName; + + List ppnList = new ArrayList<>(); + java.util.Set nntSet = new HashSet<>(); + List oaiSets; + PersonneCacheUtils personneCacheUtils = new PersonneCacheUtils(); + + private final JdbcTemplate jdbcTemplate; + + public AjouterThesesPersonnesProcessor(XMLJsonMarshalling marshall, JdbcTemplate jdbcTemplate) { + this.marshall = marshall; + this.jdbcTemplate = jdbcTemplate; + } + + @Override + public void beforeStep(StepExecution stepExecution) { + JobExecution jobExecution = stepExecution.getJobExecution(); + nomIndex = mappingJobName.getNomIndexES().get(jobExecution.getJobInstance().getJobName()); + this.oaiSets = (List) jobExecution.getExecutionContext().get("oaiSets"); + + this.personneCacheUtils = new PersonneCacheUtils( + jdbcTemplate, + tablePersonneName, + nomIndex + ); + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + return null; + } + + @Override + public TheseModel process(TheseModel theseModel) throws Exception { + + // Initialisation de la table en BDD (donc pas de multi-thread possible) + personneCacheUtils.initialisePersonneCacheBDD(); + + //TODO: rechercher les personnes qui ont cette thèse dans leur list et suppimer les personnes sans nnt + + // sortir la liste des personnes de la thèse + List personnesTef = getPersonnesModelESFromTef(theseModel.getId()); + + // recuperation des ppn + ppnList = personnesTef.stream().filter(PersonneModelES::isHas_idref).map(PersonneModelES::getPpn) + .collect(Collectors.toList()); + + // récupérer les personnes dans ES + List personnesES = getPersonnesModelESFromES(ppnList); + + // recuperation des ids des theses + personnesES.stream().map(PersonneModelES::getTheses_id).forEach(nntSet::addAll); + + // Ajout de l'id de thèse qu'on indexe + // TODO: vérifier qu'on ne transforme pas un sujet en NNT, dans ce cas, il faut supprimer IdSujet de nntSet + nntSet.add(theseModel.getId()); + + // Ré-indexer la liste des thèses + // Récupérer les theses avec JDBCTemplate + List theseModels = personneCacheUtils.getTheses(nntSet); + + for (TheseModel theseModelToAdd : theseModels) { + // Utiliser PersonneMappee + Mets mets = marshall.chargerMets(new ByteArrayInputStream(theseModelToAdd.getDoc().getBytes())); + PersonneMapee personneMapee = new PersonneMapee(mets, theseModelToAdd.getId(), oaiSets); + theseModelToAdd.setPersonnes(personneMapee.getPersonnes()); + } + + // MàJ dans la BDD + for (TheseModel theseModelToAddBdd : theseModels) { + for (PersonneModelES personneModelES : theseModelToAddBdd.getPersonnes()) { + if (personneCacheUtils.estPresentDansBDD(personneModelES.getPpn())) { + personneCacheUtils.updatePersonneDansBDD(personneModelES); + } else { + personneCacheUtils.ajoutPersonneDansBDD(personneModelES); + } + } + } + + // Nettoyer la table personne_cache des personnes qui ne sont pas dans ppnList + List personneModelEsEnBDD = personneCacheUtils.getAllPersonneModelBDD(); + + personneModelEsEnBDD.forEach(p -> { + if (p.isHas_idref() && !ppnList.contains(p.getPpn())) { + try { + personneCacheUtils.deletePersonneBDD(p.getPpn()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + + jdbcTemplate.execute("commit"); + // Rechargement de la BDD vers ES (à faire avec le job) + + return theseModel; + } + + private List getPersonnesModelESFromES(List ppnList) { + List personneModelES = new ArrayList<>(); + + ppnList.forEach(p -> { + SearchResponse response = null; + try { + TermQuery termQuery = QueryBuilders.term().field("_id").value(p).build(); + Query query = new Query.Builder().term(termQuery).build(); + + response = ElasticClient.getElasticsearchClient().search(s -> s + .index(nomIndex.toLowerCase()) + .query(query), + PersonneModelES.class + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (response.hits().total().value() > 0) { + personneModelES.addAll(response.hits().hits().stream().map(per -> per.source() + ).collect(Collectors.toList())); + } + + }); + + return personneModelES; + } + + private List getPersonnesModelESFromTef(String id) throws Exception { + + java.util.Set nnt = new HashSet<>(); + nnt.add(id); + TheseModel theseModelToAdd = personneCacheUtils.getTheses(nnt).get(0); + + Mets mets = marshall.chargerMets(new ByteArrayInputStream(theseModelToAdd.getDoc().getBytes())); + PersonneMapee personneMapee = new PersonneMapee(mets, theseModelToAdd.getId(), oaiSets); + + return personneMapee.getPersonnes(); + } +} diff --git a/src/main/java/fr/abes/theses_batch_indexation/processor/SupprimerThesesPersonneProcessor.java b/src/main/java/fr/abes/theses_batch_indexation/processor/SupprimerThesesPersonneProcessor.java index db7a9a0..d8aa46e 100644 --- a/src/main/java/fr/abes/theses_batch_indexation/processor/SupprimerThesesPersonneProcessor.java +++ b/src/main/java/fr/abes/theses_batch_indexation/processor/SupprimerThesesPersonneProcessor.java @@ -97,7 +97,7 @@ public TheseModel process(TheseModel theseModel) throws Exception { // Ré-indexer la liste des thèses // Récupérer les theses avec JDBCTemplate - List theseModels = getTheses(nntSet); + List theseModels = personneCacheUtils.getTheses(nntSet); for (TheseModel theseModelToAdd : theseModels) { // Utiliser PersonneMappee @@ -160,15 +160,7 @@ private void deleteAllPersonneES(List ppns) { }); } - private List getTheses(java.util.Set nntSet) { - if (nntSet.isEmpty()) { - return new ArrayList<>(); - } - String nnts = nntSet.stream().map(i -> "'" + i + "', ").reduce(String::concat).get(); - nnts = nnts.substring(0, nnts.lastIndexOf("', ") + 1); - return jdbcTemplate.query("select * from Document where nnt in (" + nnts + ")", new TheseRowMapper()); - } private void deletePersonnesSansPPN(List personnes) { personnes.forEach(p -> diff --git a/src/main/java/fr/abes/theses_batch_indexation/utils/MappingJobName.java b/src/main/java/fr/abes/theses_batch_indexation/utils/MappingJobName.java index 7848e02..1c39c5b 100644 --- a/src/main/java/fr/abes/theses_batch_indexation/utils/MappingJobName.java +++ b/src/main/java/fr/abes/theses_batch_indexation/utils/MappingJobName.java @@ -18,6 +18,7 @@ public MappingJobName() { nomTableES.put("indexationThesesDansES", TableIndexationES.indexation_es_these); nomTableES.put("indexationPersonnesDansES", TableIndexationES.indexation_es_personne); nomTableES.put("indexationPersonnesDeBddVersES", TableIndexationES.indexation_es_personne); + nomTableES.put("ajoutPersonnesDansES", TableIndexationES.indexation_es_personne); nomTableES.put("indexationRecherchePersonnesDansES", TableIndexationES.indexation_es_recherche_personne); nomTableES.put("indexationThematiquesDansES", TableIndexationES.indexation_es_thematique); nomTableES.put("suppressionThesesDansES", TableIndexationES.suppression_es_these); @@ -29,6 +30,7 @@ public MappingJobName() { nomIndexES.put("indexationThesesDansES", "theses"); nomIndexES.put("indexationPersonnesDansES", "personnes"); nomIndexES.put("indexationPersonnesDeBddVersES", "personnes"); + nomIndexES.put("ajoutPersonnesDansES", "personnes"); nomIndexES.put("indexationRecherchePersonnesDansES", "recherche_personnes"); nomIndexES.put("indexationThematiquesDansES", "thematiques"); nomIndexES.put("suppressionThesesDansES", "theses"); diff --git a/src/main/java/fr/abes/theses_batch_indexation/utils/PersonneCacheUtils.java b/src/main/java/fr/abes/theses_batch_indexation/utils/PersonneCacheUtils.java index 57b9297..874fcd2 100644 --- a/src/main/java/fr/abes/theses_batch_indexation/utils/PersonneCacheUtils.java +++ b/src/main/java/fr/abes/theses_batch_indexation/utils/PersonneCacheUtils.java @@ -3,6 +3,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.MismatchedInputException; +import fr.abes.theses_batch_indexation.database.TheseModel; +import fr.abes.theses_batch_indexation.database.TheseRowMapper; import fr.abes.theses_batch_indexation.dto.personne.PersonneModelES; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -10,6 +12,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -122,6 +125,18 @@ public boolean deletePersonneBDD(String ppn) throws IOException { } } + public List getTheses(java.util.Set nntSet) { + if (nntSet.isEmpty()) { + return new ArrayList<>(); + } + String nnts = nntSet.stream().map(i -> "'" + i + "', ").reduce(String::concat).get(); + nnts = nnts.substring(0, nnts.lastIndexOf("', ") + 1); + + return jdbcTemplate.query("select * from Document where nnt in (" + nnts + ")" + + "or numsujet in (" + nnts +")", + new TheseRowMapper()); + } + public static PersonneModelES mapperJson(String json) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, PersonneModelES.class);