From 4d8e944e603f6bc6c57f40d71f03cd9fe932bb3c Mon Sep 17 00:00:00 2001 From: Parth Kshirsagar Date: Fri, 4 Oct 2024 08:55:36 -0500 Subject: [PATCH 1/3] Make runjobflowrequest synchronous --- .../datapullclient/process/DataPullTask.java | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java index d87b4b4..aea1628 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java @@ -370,9 +370,16 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f request.withBootstrapActions(bsConfig); } RunJobFlowResult result=emr.runJobFlow(request); + // Wait for the cluster to be ready + boolean isClusterReady = waitForClusterReady(emr, result.getJobFlowId()); + + if (!isClusterReady) { + String errorMessage = "EMR cluster failed to start. Aborting the data pull task."; + DataPullTask.log.error(errorMessage); + throw new RuntimeException(errorMessage); + } ListStepsResult steps = emr.listSteps(new ListStepsRequest().withClusterId(result.getJobFlowId())); StepSummary step = steps.getSteps().get(0); - ; DescribeStepRequest ds = new DescribeStepRequest(); ds.withClusterId(result.getJobFlowId()); ds.withStepId(step.getId()); @@ -382,6 +389,51 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f return result; } + private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clusterId) { + DescribeClusterRequest describeRequest = new DescribeClusterRequest().withClusterId(clusterId); + int maxRetries = 60; + int retryIntervalSeconds = 30; + int retries = 0; + + while (retries < maxRetries) { + try { + DescribeClusterResult describeResult = emrClient.describeCluster(describeRequest); + ClusterStatus status = describeResult.getCluster().getStatus(); + String state = status.getState(); + DataPullTask.log.info("Cluster {} is in state {}", clusterId, state); + + switch (state) { + case "WAITING": + case "RUNNING": + DataPullTask.log.info("Cluster {} is ready.", clusterId); + return true; + case "TERMINATED_WITH_ERRORS": + case "TERMINATED": + String reason = status.getStateChangeReason().getMessage(); + DataPullTask.log.error("Cluster {} failed to start. Reason: {}", clusterId, reason); + return false; + default: + // Cluster is still starting up + break; + } + + // Wait before polling again + Thread.sleep(retryIntervalSeconds * 1000); + retries++; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + DataPullTask.log.error("Interrupted while waiting for cluster to start.", e); + return false; + } catch (Exception e) { + DataPullTask.log.error("Error while checking cluster status.", e); + return false; + } + } + + DataPullTask.log.error("Cluster {} did not start within the expected time.", clusterId); + return false; + } + private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProperties, ClusterProperties clusterProperties, DataPullProperties dataPullProperties) { From 1c077c4df4e63c325c75561b53a4388334250f00 Mon Sep 17 00:00:00 2001 From: Parth Kshirsagar Date: Wed, 9 Oct 2024 11:42:50 -0500 Subject: [PATCH 2/3] update the synchronous runjobflow with email notification --- api/pom.xml | 12 +++++ .../config/DataPullClientConfig.java | 4 +- .../process/DataPullRequestProcessor.java | 2 +- .../datapullclient/process/DataPullTask.java | 51 ++++++++++++++++++- 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index ed614a3..2d78afa 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -110,6 +110,12 @@ 2.7.0 compile + + org.scala-lang + scala-library + 2.12.0 + + org.everit.json org.everit.json.schema @@ -131,6 +137,12 @@ + + DataMigrationTool + DataMigrationFramework + 1.0-SNAPSHOT + compile + diff --git a/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java b/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java index aa57137..b793cf4 100644 --- a/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java +++ b/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java @@ -52,8 +52,8 @@ public class DataPullClientConfig { @Bean @Scope("prototype") - public DataPullTask getTask(String taskId, String json, String jksFile, List subnets, Map> stepPipelineMap) { - return new DataPullTask(taskId, json, jksFile, subnets, stepPipelineMap); + public DataPullTask getTask(String taskId, String creator, String json, String jksFile, List subnets, Map> stepPipelineMap) { + return new DataPullTask(taskId, creator, json, jksFile, subnets, stepPipelineMap); } @Bean diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java index 1c01192..ad9ecf1 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java @@ -351,7 +351,7 @@ private Boolean createBootstrapScript(Migration[] myObjects, String bootstrapFil private DataPullTask createDataPullTask(String fileS3Path, String jksFilePath, ClusterProperties properties, String jobName, String creator, String customJarFilePath, Boolean haveBootstrapAction) { String creatorTag = String.join(" ", Arrays.asList(creator.split(",|;"))); - DataPullTask task = config.getTask(jobName, fileS3Path, jksFilePath,rotateSubnets(),getStepForPipeline()).withClusterProperties(properties).withCustomJar(customJarFilePath).haveBootstrapAction(haveBootstrapAction) + DataPullTask task = config.getTask(jobName, creator, fileS3Path, jksFilePath,rotateSubnets(),getStepForPipeline()).withClusterProperties(properties).withCustomJar(customJarFilePath).haveBootstrapAction(haveBootstrapAction) .addTag("Creator", creatorTag).addTag("Env", Objects.toString(properties.getAwsEnv(), env)).addTag("Name", jobName) .addTag("AssetProtectionLevel", "99").addTag("ComponentInfo", properties.getComponentInfo()) .addTag("Portfolio", properties.getPortfolio()).addTag("Product", properties.getProduct()).addTag("Team", properties.getTeam()).addTag("tool", "datapull") diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java index aea1628..127c3e2 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java @@ -18,15 +18,23 @@ import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.model.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.homeaway.datapullclient.config.DataPullClientConfig; import com.homeaway.datapullclient.config.DataPullProperties; import com.homeaway.datapullclient.config.EMRProperties; import com.homeaway.datapullclient.input.ClusterProperties; +import config.AppConfig; +import core.Controller; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -42,6 +50,8 @@ public class DataPullTask implements Runnable { private final String jsonS3Path; + private final String creator; + private final String s3FilePath; private final String jksS3Path; @@ -59,7 +69,8 @@ public class DataPullTask implements Runnable { private final List subnets ; private final Map> stepPipelineMap; - public DataPullTask(final String taskId, final String s3File, final String jksFilePath, final List subnets, final Map> stepPipelineMap) { + public DataPullTask(final String taskId, final String creator, final String s3File, final String jksFilePath, final List subnets, final Map> stepPipelineMap) { + this.creator = creator; s3FilePath = s3File; this.taskId = taskId; jsonS3Path = this.s3FilePath + ".json"; @@ -376,6 +387,7 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f if (!isClusterReady) { String errorMessage = "EMR cluster failed to start. Aborting the data pull task."; DataPullTask.log.error(errorMessage); + sendEmail(errorMessage, creator); throw new RuntimeException(errorMessage); } ListStepsResult steps = emr.listSteps(new ListStepsRequest().withClusterId(result.getJobFlowId())); @@ -389,6 +401,42 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f return result; } + + private void sendEmail(String message, String creator) { + try { + // Create ObjectMapper for YAML + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + + // Load application.yml from resources + InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("application.yml"); + + // Read YAML content into JsonNode + JsonNode applicationConf = yamlMapper.readTree(inputStream); + + AppConfig config = new AppConfig(applicationConf); + String pipelineName = clusterProperties.getPipelineName(); + + String env = clusterProperties.getAwsEnv(); + String applicationId = clusterProperties.getApplication(); + String subject = "Data Pull job failed for the pipeline " + pipelineName + " in " + env + " environment"; + + StringBuilder reportbodyHtml = new StringBuilder(); + reportbodyHtml.append("

Errors!

") + .append(Instant.now().toString()) + .append("") + .append(message) + .append(""); + Controller controllerInstance = new Controller(config, pipelineName); + String htmlContent = controllerInstance.neatifyReportHtml(reportbodyHtml.toString(), true, true); + controllerInstance.SendEmail(creator, htmlContent, applicationId, + pipelineName, env, subject, ""); + } + catch (IOException e) { + DataPullTask.log.error("Error while sending email", e); + } + + } + private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clusterId) { DescribeClusterRequest describeRequest = new DescribeClusterRequest().withClusterId(clusterId); int maxRetries = 60; @@ -407,6 +455,7 @@ private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clu case "RUNNING": DataPullTask.log.info("Cluster {} is ready.", clusterId); return true; + case "TERMINATING": case "TERMINATED_WITH_ERRORS": case "TERMINATED": String reason = status.getStateChangeReason().getMessage(); From c011b5b647fb2395d196b96011b6df4c411b81aa Mon Sep 17 00:00:00 2001 From: Parth Kshirsagar Date: Thu, 17 Oct 2024 15:25:16 -0500 Subject: [PATCH 3/3] remove terminating as this condition is not needed for synchronous call --- .../java/com/homeaway/datapullclient/process/DataPullTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java index 127c3e2..34b118d 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java @@ -455,7 +455,6 @@ private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clu case "RUNNING": DataPullTask.log.info("Cluster {} is ready.", clusterId); return true; - case "TERMINATING": case "TERMINATED_WITH_ERRORS": case "TERMINATED": String reason = status.getStateChangeReason().getMessage();