Skip to content

Commit

Permalink
Ajout du job d'ajout des personnes
Browse files Browse the repository at this point in the history
  • Loading branch information
slemaire777 committed Jan 30, 2024
1 parent 4a36de3 commit 34b03ef
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public Job jobIndexationPersonnesDeBddVersES(Tasklet initialiserIndexESTasklet,

@Bean
public Job jobIndexationRecherchePersonnesDansES(Step stepIndexRecherchePersonnesDansBDD,
Tasklet initialiserIndexESTasklet,
Tasklet initiliserIndexBDDTasklet,
Tasklet indexerPersonnesDansESTasklet,
Tasklet chargerOaiSetsTasklet,
JobTheseCompletionNotificationListener listener) {
Tasklet initialiserIndexESTasklet,
Tasklet initiliserIndexBDDTasklet,
Tasklet indexerPersonnesDansESTasklet,
Tasklet chargerOaiSetsTasklet,
JobTheseCompletionNotificationListener listener) {
return jobs.get("indexationRecherchePersonnesDansES").incrementer(new RunIdIncrementer())
.listener(listener)
.start(stepInitiliserIndexBDDTasklet(initiliserIndexBDDTasklet))
Expand All @@ -112,9 +112,9 @@ public Job jobIndexationRecherchePersonnesDansES(Step stepIndexRecherchePersonne

@Bean
public Job jobIndexationThematiquesDansES(Step stepIndexThematiquesDansES,
JobRepository jobRepository,
Tasklet initialiserIndexESTasklet,
JobTheseCompletionNotificationListener listener) {
JobRepository jobRepository,
Tasklet initialiserIndexESTasklet,
JobTheseCompletionNotificationListener listener) {
log.debug("debut du job indexation des thematiques dans ES...");

return jobs.get("indexationThematiquesDansES").repository(jobRepository).incrementer(new RunIdIncrementer())
Expand All @@ -128,8 +128,8 @@ public Job jobIndexationThematiquesDansES(Step stepIndexThematiquesDansES,

@Bean
public Job jobSuppressionThesesDansES(Step stepSupprimeThesesOuThematiquesDansES,
JobRepository jobRepository,
JobTheseCompletionNotificationListener listener) {
JobRepository jobRepository,
JobTheseCompletionNotificationListener listener) {
log.debug("debut du job de suppression des theses dans ES...");

return jobs.get("suppressionThesesDansES").repository(jobRepository).incrementer(new RunIdIncrementer())
Expand All @@ -140,8 +140,8 @@ public Job jobSuppressionThesesDansES(Step stepSupprimeThesesOuThematiquesDansES

@Bean
public Job jobSuppressionThematiquesDansES(Step stepSupprimeThesesOuThematiquesDansES,
JobRepository jobRepository,
JobTheseCompletionNotificationListener listener) {
JobRepository jobRepository,
JobTheseCompletionNotificationListener listener) {
log.debug("debut du job de suppression des thématiques dans ES...");

return jobs.get("suppressionThematiquesDansES").repository(jobRepository).incrementer(new RunIdIncrementer())
Expand All @@ -168,6 +168,21 @@ public Job jobSuppressionPersonnesDansES(Step stepSupprimePersonnesDansES,
.build();
}

@Bean
public Job jobAjoutPersonnesDansES(Step stepAjouterPersonnesDansES,
JobRepository jobRepository,
Tasklet initiliserIndexBDDTasklet,
Tasklet indexerPersonnesDansESTasklet,
Tasklet chargerOaiSetsTasklet,
JobTheseCompletionNotificationListener listener) {
return jobs.get("ajoutPersonnesDansES").repository(jobRepository).incrementer(new RunIdIncrementer())
.listener(listener)
.start(stepInitiliserIndexBDDTasklet(initiliserIndexBDDTasklet))
.next(stepChargerListeOaiSets(chargerOaiSetsTasklet))
.next(stepAjouterPersonnesDansES)
.next(stepIndexerPersonnesDansESTasklet(indexerPersonnesDansESTasklet))
.build();
}


// ---------- STEP --------------------------------------------
Expand Down Expand Up @@ -198,6 +213,7 @@ public Step stepIndexThematiquesDansES(@Qualifier("jdbcPagingCustomReader") Jdbc
.throttleLimit(config.getThrottle())
.build();
}

@Bean
public Step stepIndexPersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcPagingCustomReader itemReader,
@Qualifier("personneItemProcessor") ItemProcessor itemProcessor,
Expand All @@ -208,6 +224,7 @@ public Step stepIndexPersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcP
.writer(itemWriter)
.build();
}

@Bean
public Step stepIndexRecherchePersonnesDansBDD(@Qualifier("jdbcPagingCustomReader") JdbcPagingCustomReader itemReader,
@Qualifier("recherchePersonneItemProcessor") ItemProcessor itemProcessor,
Expand Down Expand Up @@ -248,7 +265,7 @@ public Step stepChargerListeOaiSets(@Qualifier("chargerOaiSetsTasklet") Tasklet

@Bean
public Step stepSupprimeThesesOuThematiquesDansES(@Qualifier("jdbcPagingDeleteReader") JdbcPagingDeleteReader itemReader,
@Qualifier("ESDeleteWriter") ItemWriter itemWriter) {
@Qualifier("ESDeleteWriter") ItemWriter itemWriter) {
return stepBuilderFactory.get("stepSuppressionThese").<TheseModel, TheseModel>chunk(config.getChunk())
.listener(theseWriteListener)
.reader(itemReader)
Expand All @@ -260,8 +277,19 @@ public Step stepSupprimeThesesOuThematiquesDansES(@Qualifier("jdbcPagingDeleteRe

@Bean
public Step stepSupprimePersonnesDansES(@Qualifier("jdbcPagingDeleteReader") JdbcPagingDeleteReader itemReader,
@Qualifier("supprimerThesesPersonneProcessor") ItemProcessor itemProcessor) {
return stepBuilderFactory.get("stepSupprimePersonnes").<TheseModel, TheseModel>chunk(config.getChunk())
@Qualifier("supprimerThesesPersonneProcessor") ItemProcessor itemProcessor) {
return stepBuilderFactory.get("stepSupprimePersonnesDansES").<TheseModel, TheseModel>chunk(1)
.listener(theseWriteListener)
.reader(itemReader)
.processor(itemProcessor)
.taskExecutor(taskExecutor())
.build();
}

@Bean
public Step stepAjouterPersonnesDansES(JdbcPagingCustomReader itemReader,
@Qualifier("ajouterThesesPersonnesProcessor") ItemProcessor itemProcessor) {
return stepBuilderFactory.get("stepAjouterPersonnesDansES").chunk(1)
.listener(theseWriteListener)
.reader(itemReader)
.processor(itemProcessor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package fr.abes.theses_batch_indexation.processor;

import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import fr.abes.theses_batch_indexation.configuration.ElasticClient;
import fr.abes.theses_batch_indexation.database.TheseModel;
import fr.abes.theses_batch_indexation.dto.personne.PersonneMapee;
import fr.abes.theses_batch_indexation.dto.personne.PersonneModelES;
import fr.abes.theses_batch_indexation.dto.personne.PersonneModelESAvecId;
import fr.abes.theses_batch_indexation.model.oaisets.Set;
import fr.abes.theses_batch_indexation.model.tef.Mets;
import fr.abes.theses_batch_indexation.utils.MappingJobName;
import fr.abes.theses_batch_indexation.utils.PersonneCacheUtils;
import fr.abes.theses_batch_indexation.utils.XMLJsonMarshalling;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Component
public class AjouterThesesPersonnesProcessor implements ItemProcessor<TheseModel, TheseModel>, StepExecutionListener {
MappingJobName mappingJobName = new MappingJobName();
private final XMLJsonMarshalling marshall;
String nomIndex;

@Value("${table.personne.name}")
private String tablePersonneName;

List<String> ppnList = new ArrayList<>();
java.util.Set<String> nntSet = new HashSet<>();
List<Set> oaiSets;
PersonneCacheUtils personneCacheUtils = new PersonneCacheUtils();

private final JdbcTemplate jdbcTemplate;

public AjouterThesesPersonnesProcessor(XMLJsonMarshalling marshall, JdbcTemplate jdbcTemplate) {
this.marshall = marshall;
this.jdbcTemplate = jdbcTemplate;
}

@Override
public void beforeStep(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
nomIndex = mappingJobName.getNomIndexES().get(jobExecution.getJobInstance().getJobName());
this.oaiSets = (List<Set>) jobExecution.getExecutionContext().get("oaiSets");

this.personneCacheUtils = new PersonneCacheUtils(
jdbcTemplate,
tablePersonneName,
nomIndex
);
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}

@Override
public TheseModel process(TheseModel theseModel) throws Exception {

// Initialisation de la table en BDD (donc pas de multi-thread possible)
personneCacheUtils.initialisePersonneCacheBDD();

//TODO: rechercher les personnes qui ont cette thèse dans leur list et suppimer les personnes sans nnt

// sortir la liste des personnes de la thèse
List<PersonneModelES> personnesTef = getPersonnesModelESFromTef(theseModel.getId());

// recuperation des ppn
ppnList = personnesTef.stream().filter(PersonneModelES::isHas_idref).map(PersonneModelES::getPpn)
.collect(Collectors.toList());

// récupérer les personnes dans ES
List<PersonneModelES> personnesES = getPersonnesModelESFromES(ppnList);

// recuperation des ids des theses
personnesES.stream().map(PersonneModelES::getTheses_id).forEach(nntSet::addAll);

// Ajout de l'id de thèse qu'on indexe
// TODO: vérifier qu'on ne transforme pas un sujet en NNT, dans ce cas, il faut supprimer IdSujet de nntSet
nntSet.add(theseModel.getId());

// Ré-indexer la liste des thèses
// Récupérer les theses avec JDBCTemplate
List<TheseModel> theseModels = personneCacheUtils.getTheses(nntSet);

for (TheseModel theseModelToAdd : theseModels) {
// Utiliser PersonneMappee
Mets mets = marshall.chargerMets(new ByteArrayInputStream(theseModelToAdd.getDoc().getBytes()));
PersonneMapee personneMapee = new PersonneMapee(mets, theseModelToAdd.getId(), oaiSets);
theseModelToAdd.setPersonnes(personneMapee.getPersonnes());
}

// MàJ dans la BDD
for (TheseModel theseModelToAddBdd : theseModels) {
for (PersonneModelES personneModelES : theseModelToAddBdd.getPersonnes()) {
if (personneCacheUtils.estPresentDansBDD(personneModelES.getPpn())) {
personneCacheUtils.updatePersonneDansBDD(personneModelES);
} else {
personneCacheUtils.ajoutPersonneDansBDD(personneModelES);
}
}
}

// Nettoyer la table personne_cache des personnes qui ne sont pas dans ppnList
List<PersonneModelES> personneModelEsEnBDD = personneCacheUtils.getAllPersonneModelBDD();

personneModelEsEnBDD.forEach(p -> {
if (p.isHas_idref() && !ppnList.contains(p.getPpn())) {
try {
personneCacheUtils.deletePersonneBDD(p.getPpn());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});

jdbcTemplate.execute("commit");
// Rechargement de la BDD vers ES (à faire avec le job)

return theseModel;
}

private List<PersonneModelES> getPersonnesModelESFromES(List<String> ppnList) {
List<PersonneModelES> personneModelES = new ArrayList<>();

ppnList.forEach(p -> {
SearchResponse<PersonneModelES> response = null;
try {
TermQuery termQuery = QueryBuilders.term().field("_id").value(p).build();
Query query = new Query.Builder().term(termQuery).build();

response = ElasticClient.getElasticsearchClient().search(s -> s
.index(nomIndex.toLowerCase())
.query(query),
PersonneModelES.class
);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (response.hits().total().value() > 0) {
personneModelES.addAll(response.hits().hits().stream().map(per -> per.source()
).collect(Collectors.toList()));
}

});

return personneModelES;
}

private List<PersonneModelES> getPersonnesModelESFromTef(String id) throws Exception {

java.util.Set<String> nnt = new HashSet<>();
nnt.add(id);
TheseModel theseModelToAdd = personneCacheUtils.getTheses(nnt).get(0);

Mets mets = marshall.chargerMets(new ByteArrayInputStream(theseModelToAdd.getDoc().getBytes()));
PersonneMapee personneMapee = new PersonneMapee(mets, theseModelToAdd.getId(), oaiSets);

return personneMapee.getPersonnes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public TheseModel process(TheseModel theseModel) throws Exception {

// Ré-indexer la liste des thèses
// Récupérer les theses avec JDBCTemplate
List<TheseModel> theseModels = getTheses(nntSet);
List<TheseModel> theseModels = personneCacheUtils.getTheses(nntSet);

for (TheseModel theseModelToAdd : theseModels) {
// Utiliser PersonneMappee
Expand Down Expand Up @@ -160,15 +160,7 @@ private void deleteAllPersonneES(List<String> ppns) {
});
}

private List<TheseModel> getTheses(java.util.Set<String> nntSet) {
if (nntSet.isEmpty()) {
return new ArrayList<>();
}
String nnts = nntSet.stream().map(i -> "'" + i + "', ").reduce(String::concat).get();
nnts = nnts.substring(0, nnts.lastIndexOf("', ") + 1);

return jdbcTemplate.query("select * from Document where nnt in (" + nnts + ")", new TheseRowMapper());
}

private void deletePersonnesSansPPN(List<PersonneModelESAvecId> personnes) {
personnes.forEach(p ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public MappingJobName() {
nomTableES.put("indexationThesesDansES", TableIndexationES.indexation_es_these);
nomTableES.put("indexationPersonnesDansES", TableIndexationES.indexation_es_personne);
nomTableES.put("indexationPersonnesDeBddVersES", TableIndexationES.indexation_es_personne);
nomTableES.put("ajoutPersonnesDansES", TableIndexationES.indexation_es_personne);
nomTableES.put("indexationRecherchePersonnesDansES", TableIndexationES.indexation_es_recherche_personne);
nomTableES.put("indexationThematiquesDansES", TableIndexationES.indexation_es_thematique);
nomTableES.put("suppressionThesesDansES", TableIndexationES.suppression_es_these);
Expand All @@ -29,6 +30,7 @@ public MappingJobName() {
nomIndexES.put("indexationThesesDansES", "theses");
nomIndexES.put("indexationPersonnesDansES", "personnes");
nomIndexES.put("indexationPersonnesDeBddVersES", "personnes");
nomIndexES.put("ajoutPersonnesDansES", "personnes");
nomIndexES.put("indexationRecherchePersonnesDansES", "recherche_personnes");
nomIndexES.put("indexationThematiquesDansES", "thematiques");
nomIndexES.put("suppressionThesesDansES", "theses");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import fr.abes.theses_batch_indexation.database.TheseModel;
import fr.abes.theses_batch_indexation.database.TheseRowMapper;
import fr.abes.theses_batch_indexation.dto.personne.PersonneModelES;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.jdbc.core.JdbcTemplate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -122,6 +125,18 @@ public boolean deletePersonneBDD(String ppn) throws IOException {
}
}

public List<TheseModel> getTheses(java.util.Set<String> nntSet) {
if (nntSet.isEmpty()) {
return new ArrayList<>();
}
String nnts = nntSet.stream().map(i -> "'" + i + "', ").reduce(String::concat).get();
nnts = nnts.substring(0, nnts.lastIndexOf("', ") + 1);

return jdbcTemplate.query("select * from Document where nnt in (" + nnts + ")" +
"or numsujet in (" + nnts +")",
new TheseRowMapper());
}

public static PersonneModelES mapperJson(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, PersonneModelES.class);
Expand Down

0 comments on commit 34b03ef

Please sign in to comment.