Skip to content

Commit

Permalink
testing iosas integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Harry0589 committed Mar 25, 2024
1 parent 0b4c467 commit c614c21
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,21 @@ public enum EventOutcome {

SCHOOL_CREATED_IN_SPM,

SCHOOL_UPDATED_IN_SPM,


SCHOOL_WRITE_SKIPPED_IN_SPM_FOR_DATES,

SCHOOL_CREATED_IN_IOSAS,

SCHOOL_UPDATED_IN_IOSAS,


SCHOOL_CREATED_IN_ISFS,

SCHOOL_UPDATED_IN_ISFS,


DISTRICT_UPDATED,

DISTRICT_CREATED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ public enum EventType {

CREATE_SCHOOL_IN_SPM,

UPDATE_SCHOOL_IN_SPM,


CREATE_SCHOOL_IN_IOSAS,

UPDATE_SCHOOL_IN_IOSAS,

CREATE_SCHOOL_IN_ISFS,

UPDATE_SCHOOL_IN_ISFS,

UPDATE_DISTRICT,

CREATE_DISTRICT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public enum SagaEnum {
* Pen replication school create saga enum.
*/
PEN_REPLICATION_SCHOOL_CREATE_SAGA("PEN_REPLICATION_SCHOOL_CREATE_SAGA"),
/**
* Pen replication school update saga enum.
*/
PEN_REPLICATION_SCHOOL_UPDATE_SAGA("PEN_REPLICATION_SCHOOL_UPDATE_SAGA"),

/**
* Pen replication district create saga enum.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package ca.bc.gov.educ.api.pen.replication.orchestrator;

import ca.bc.gov.educ.api.pen.replication.constants.IndependentSchoolSystem;
import ca.bc.gov.educ.api.pen.replication.constants.SagaEnum;
import ca.bc.gov.educ.api.pen.replication.constants.SagaTopicsEnum;
import ca.bc.gov.educ.api.pen.replication.messaging.MessagePublisher;
import ca.bc.gov.educ.api.pen.replication.model.Saga;
import ca.bc.gov.educ.api.pen.replication.model.SagaEvent;
import ca.bc.gov.educ.api.pen.replication.model.SchoolMasterEntity;
import ca.bc.gov.educ.api.pen.replication.orchestrator.base.BaseOrchestrator;
import ca.bc.gov.educ.api.pen.replication.rest.RestUtils;
import ca.bc.gov.educ.api.pen.replication.service.SagaService;
import ca.bc.gov.educ.api.pen.replication.service.SchoolCreateService;
import ca.bc.gov.educ.api.pen.replication.struct.Event;
import ca.bc.gov.educ.api.pen.replication.struct.saga.SchoolUpdateSagaData;
import ca.bc.gov.educ.api.pen.replication.util.JsonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.stereotype.Component;

import static ca.bc.gov.educ.api.pen.replication.constants.EventOutcome.*;
import static ca.bc.gov.educ.api.pen.replication.constants.EventType.*;


@Component
@Slf4j
public class SchoolUpdateOrchestrator extends BaseOrchestrator<SchoolUpdateSagaData> {

public static final String RESPONDED_VIA_NATS_TO_FOR_EVENT = "responded via NATS to {} for {} Event. :: {}";
private final RestUtils restUtils;
private final SchoolCreateService schoolCreateService;

protected SchoolUpdateOrchestrator(final SagaService sagaService, final MessagePublisher messagePublisher, final EntityManagerFactory entityManagerFactory, final RestUtils restUtils, SchoolCreateService schoolCreateService) {
super(entityManagerFactory, sagaService, messagePublisher, SchoolUpdateSagaData.class, SagaEnum.PEN_REPLICATION_SCHOOL_CREATE_SAGA, SagaTopicsEnum.PEN_REPLICATION_SCHOOL_CREATE_SAGA_TOPIC);
this.restUtils = restUtils;
this.schoolCreateService = schoolCreateService;
}
@Override
public void populateStepsToExecuteMap() {
this.stepBuilder()
.begin( UPDATE_SCHOOL_IN_SPM, this::updateSchoolInSPM)
.step(UPDATE_SCHOOL_IN_SPM, SCHOOL_UPDATED_IN_SPM, UPDATE_SCHOOL_IN_IOSAS, this::updateSchoolInIOSAS)
.step(UPDATE_SCHOOL_IN_SPM, SCHOOL_WRITE_SKIPPED_IN_SPM_FOR_DATES, UPDATE_SCHOOL_IN_IOSAS, this::updateSchoolInIOSAS)
.step(UPDATE_SCHOOL_IN_IOSAS, SCHOOL_UPDATED_IN_IOSAS, UPDATE_SCHOOL_IN_ISFS, this::updateSchoolInISFS)
.end(UPDATE_SCHOOL_IN_ISFS, SCHOOL_UPDATED_IN_ISFS);
}

private void updateSchoolInSPM(final Event event, final Saga saga, final SchoolUpdateSagaData schoolUpdateSagaData) throws JsonProcessingException {
saga.setSagaState(UPDATE_SCHOOL_IN_SPM.toString());
final SagaEvent eventStates = this.createEventState(saga, event.getEventType(), event.getEventOutcome(), event.getEventPayload());
this.getSagaService().updateAttachedSagaWithEvents(saga, eventStates);
var school = schoolUpdateSagaData.getSchool();

SchoolMasterEntity newSchoolMaster = schoolCreateService.saveSchool(school);
Event nextEvent = null;
if(newSchoolMaster != null) {
nextEvent = Event.builder().sagaId(saga.getSagaId())
.eventType(UPDATE_SCHOOL_IN_SPM)
.eventOutcome(SCHOOL_UPDATED_IN_SPM)
.eventPayload(JsonUtil.getJsonStringFromObject(newSchoolMaster))
.build();
}else{
nextEvent = Event.builder().sagaId(saga.getSagaId())
.eventType(UPDATE_SCHOOL_IN_SPM)
.eventOutcome(SCHOOL_WRITE_SKIPPED_IN_SPM_FOR_DATES)
.eventPayload("No update required")
.build();
}
this.postMessageToTopic(this.getTopicToSubscribe().getCode(), nextEvent); // this will make it async and use pub/sub flow even though it is sending message to itself
log.info(RESPONDED_VIA_NATS_TO_FOR_EVENT, this.getTopicToSubscribe(), UPDATE_SCHOOL_IN_SPM, saga.getSagaId());
}

private void updateSchoolInIOSAS(final Event event, final Saga saga, final SchoolUpdateSagaData schoolCreateSagaData) throws JsonProcessingException {
saga.setSagaState(UPDATE_SCHOOL_IN_IOSAS.toString());
final SagaEvent eventStates = this.createEventState(saga, event.getEventType(), event.getEventOutcome(), event.getEventPayload());
this.getSagaService().updateAttachedSagaWithEvents(saga, eventStates);

restUtils.createOrUpdateSchoolInIndependentSchoolSystem(schoolCreateSagaData.getSchool(), IndependentSchoolSystem.IOSAS);

val nextEvent = Event.builder().sagaId(saga.getSagaId())
.eventType(UPDATE_SCHOOL_IN_IOSAS)
.eventOutcome(SCHOOL_CREATED_IN_IOSAS)
.eventPayload(JsonUtil.getJsonStringFromObject(schoolCreateSagaData.getSchool()))
.build();
this.postMessageToTopic(this.getTopicToSubscribe().getCode(), nextEvent); // this will make it async and use pub/sub flow even though it is sending message to itself
log.info(RESPONDED_VIA_NATS_TO_FOR_EVENT, this.getTopicToSubscribe(), UPDATE_SCHOOL_IN_IOSAS, saga.getSagaId());
}

private void updateSchoolInISFS(final Event event, final Saga saga, final SchoolUpdateSagaData schoolCreateSagaData) throws JsonProcessingException {
saga.setSagaState(UPDATE_SCHOOL_IN_ISFS.toString());
final SagaEvent eventStates = this.createEventState(saga, event.getEventType(), event.getEventOutcome(), event.getEventPayload());
this.getSagaService().updateAttachedSagaWithEvents(saga, eventStates);

restUtils.createOrUpdateSchoolInIndependentSchoolSystem(schoolCreateSagaData.getSchool(), IndependentSchoolSystem.ISFS);

val nextEvent = Event.builder().sagaId(saga.getSagaId())
.eventType(UPDATE_SCHOOL_IN_ISFS)
.eventOutcome(SCHOOL_UPDATED_IN_ISFS)
.eventPayload(JsonUtil.getJsonStringFromObject(schoolCreateSagaData.getSchool()))
.build();
this.postMessageToTopic(this.getTopicToSubscribe().getCode(), nextEvent); // this will make it async and use pub/sub flow even though it is sending message to itself
log.info(RESPONDED_VIA_NATS_TO_FOR_EVENT, this.getTopicToSubscribe(), UPDATE_SCHOOL_IN_ISFS, saga.getSagaId());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import ca.bc.gov.educ.api.pen.replication.struct.IndependentAuthority;
import ca.bc.gov.educ.api.pen.replication.struct.School;
import ca.bc.gov.educ.api.pen.replication.struct.saga.AuthorityCreateSagaData;
import ca.bc.gov.educ.api.pen.replication.struct.saga.AuthorityUpdateSagaData;
import ca.bc.gov.educ.api.pen.replication.struct.saga.SchoolCreateSagaData;
import ca.bc.gov.educ.api.pen.replication.struct.saga.SchoolUpdateSagaData;
import ca.bc.gov.educ.api.pen.replication.util.JsonUtil;
import io.nats.client.Message;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -23,8 +25,7 @@
import java.util.List;
import java.util.Map;

import static ca.bc.gov.educ.api.pen.replication.constants.SagaEnum.PEN_REPLICATION_AUTHORITY_CREATE_SAGA;
import static ca.bc.gov.educ.api.pen.replication.constants.SagaEnum.PEN_REPLICATION_SCHOOL_CREATE_SAGA;
import static ca.bc.gov.educ.api.pen.replication.constants.SagaEnum.*;


/**
Expand Down Expand Up @@ -78,6 +79,18 @@ public void handleChoreographyEvent(@NonNull final ChoreographedEvent choreograp
log.info("Acknowledged CREATE_AUTHORITY event to Jet Stream...");
orchestratorCreateAuthority.startSaga(saga);
break;
case UPDATE_AUTHORITY:
log.info("Persisting UPDATE_AUTHORITY event record for Saga processing :: {} ", choreographedEvent);
val orchestratorUpdateAuthority = this.sagaEnumOrchestratorMap.get(PEN_REPLICATION_AUTHORITY_CREATE_SAGA);
val updateAuthority = JsonUtil.getJsonObjectFromString(IndependentAuthority.class, choreographedEvent.getEventPayload());
final AuthorityUpdateSagaData authorityUpdateSagaData = AuthorityUpdateSagaData.builder()
.independentAuthority(updateAuthority)
.build();
val updateAuthoritySaga = this.sagaService.persistSagaData(orchestratorUpdateAuthority.getSagaName().getCode(), ApplicationProperties.API_NAME, JsonUtil.getJsonStringFromObject(authorityUpdateSagaData), choreographedEvent.getEventID());
message.ack();
log.info("Acknowledged CREATE_AUTHORITY event to Jet Stream...");
orchestratorUpdateAuthority.startSaga(updateAuthoritySaga);
break;
case CREATE_SCHOOL:
log.info("Persisting CREATE_SCHOOL event record for Saga processing :: {} ", choreographedEvent);
val orchestratorCreateSchool = this.sagaEnumOrchestratorMap.get(PEN_REPLICATION_SCHOOL_CREATE_SAGA);
Expand All @@ -90,6 +103,18 @@ public void handleChoreographyEvent(@NonNull final ChoreographedEvent choreograp
log.info("Acknowledged CREATE_SCHOOL event to Jet Stream...");
orchestratorCreateSchool.startSaga(sagaCreateSchool);
break;
case UPDATE_SCHOOL:
log.info("Persisting UPDATE_SCHOOL event record for Saga processing :: {} ", choreographedEvent);
val orchestratorUpdateSchool = this.sagaEnumOrchestratorMap.get(PEN_REPLICATION_SCHOOL_UPDATE_SAGA);
val updateSchool = JsonUtil.getJsonObjectFromString(School.class, choreographedEvent.getEventPayload());
final SchoolUpdateSagaData schoolUpdateSagaData = SchoolUpdateSagaData.builder()
.school(updateSchool)
.build();
val sagaUpdateSchool = this.sagaService.persistSagaData(orchestratorUpdateSchool.getSagaName().getCode(), ApplicationProperties.API_NAME, JsonUtil.getJsonStringFromObject(orchestratorUpdateSchool), choreographedEvent.getEventID());
message.ack();
log.info("Acknowledged UPDATE_SCHOOL event to Jet Stream...");
orchestratorUpdateSchool.startSaga(sagaUpdateSchool);
break;
default:
log.info("Persisting event record for choreography processing :: {} ", choreographedEvent);
final var persistedEvent = this.choreographedEventPersistenceService.persistEventToDB(choreographedEvent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ca.bc.gov.educ.api.pen.replication.struct.saga;

import ca.bc.gov.educ.api.pen.replication.struct.IndependentAuthority;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AuthorityUpdateSagaData implements Serializable {

private static final long serialVersionUID = 6922881826450048299L;

IndependentAuthority independentAuthority;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ca.bc.gov.educ.api.pen.replication.struct.saga;

import ca.bc.gov.educ.api.pen.replication.struct.School;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SchoolUpdateSagaData implements Serializable {

private static final long serialVersionUID = 6922881826450048290L;

School school;
}

0 comments on commit c614c21

Please sign in to comment.