diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 20c5c725..59159e8a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,14 +42,11 @@ jobs: run: | cd buildstockbatch python -m pip install --progress-bar off --upgrade pip - pip install .[dev] --progress-bar off - - name: Linting - run: | - cd buildstockbatch - # stop the build if there are Python syntax errors or undefined names - flake8 buildstockbatch --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. - flake8 buildstockbatch --count --statistics --exit-zero + pip install .[dev,aws] --progress-bar off + - name: Black + uses: psf/black@stable + with: + src: "./buildstockbatch" - name: Run PyTest and Coverage run: | cd buildstockbatch @@ -84,5 +81,3 @@ jobs: with: name: documentation path: buildstockbatch/docs/_build/html/ - - uses: pre-commit-ci/lite-action@v1.0.1 - if: always() diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d5bdeb08..5fa30beb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,7 +6,7 @@ repos: - id: end-of-file-fixer exclude_types: ["csv","tsv"] - repo: https://github.com/psf/black-pre-commit-mirror - rev: 23.10.1 + rev: 24.1.1 hooks: - id: black language_version: python3.11 diff --git a/Dockerfile b/Dockerfile index e7938c65..27335de8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,24 +1,21 @@ -FROM nrel/openstudio:2.9.1 - -ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 - -RUN sudo apt update && \ - sudo apt install -y wget build-essential checkinstall libreadline-gplv2-dev libncursesw5-dev libssl-dev \ - libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev libffi-dev zlib1g-dev - -RUN wget https://www.python.org/ftp/python/3.8.8/Python-3.8.8.tgz && \ - tar xzf Python-3.8.8.tgz && \ - cd Python-3.8.8 && \ - ./configure --enable-optimizations && \ - make altinstall && \ - rm -rf Python-3.8.8 && \ - rm -rf Python-3.8.8.tgz - -RUN sudo apt install -y -V ca-certificates lsb-release && \ - wget https://apache.bintray.com/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-archive-keyring-latest-$(lsb_release --codename --short).deb && \ - sudo apt install -y -V ./apache-arrow-archive-keyring-latest-$(lsb_release --codename --short).deb && \ - sudo apt update && \ - sudo apt install -y -V libarrow-dev libarrow-glib-dev libarrow-dataset-dev libparquet-dev libparquet-glib-dev +ARG OS_VER +FROM --platform=linux/amd64 nrel/openstudio:$OS_VER as buildstockbatch +RUN sudo apt update && sudo apt install -y python3-pip +RUN sudo -H pip install --upgrade pip COPY . /buildstock-batch/ -RUN python3.8 -m pip install /buildstock-batch +RUN python3 -m pip install "/buildstock-batch[aws]" + +# Base plus custom gems +FROM buildstockbatch as buildstockbatch-custom-gems +RUN sudo cp /buildstock-batch/Gemfile /var/oscli/ +# OpenStudio's docker image sets ENV BUNDLE_WITHOUT=native_ext +# https://github.com/NREL/docker-openstudio/blob/3.2.1/Dockerfile#L12 +# which overrides anything set via bundle config commands. +# Unset this so that bundle config commands work properly. +RUN unset BUNDLE_WITHOUT +# Note the addition of 'set' in bundle config commands +RUN bundle config set git.allow_insecure true +RUN bundle config set path /var/oscli/gems/ +RUN bundle config set without 'test development native_ext' +RUN bundle install --gemfile /var/oscli/Gemfile diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py index 9bdc894a..0b417c8d 100644 --- a/buildstockbatch/aws/aws.py +++ b/buildstockbatch/aws/aws.py @@ -10,10 +10,13 @@ :license: BSD-3 """ import argparse -from awsretry import AWSRetry import base64 import boto3 from botocore.exceptions import ClientError +from copy import deepcopy +import csv +from dask.distributed import Client +from dask_cloudprovider.aws import FargateCluster import gzip from joblib import Parallel, delayed import json @@ -22,22 +25,54 @@ import pathlib import random from s3fs import S3FileSystem +import shutil import tarfile import re +import tempfile import time +import tqdm import io -import zipfile +import yaml -from buildstockbatch.aws.awsbase import AwsJobBase from buildstockbatch.base import ValidationError +from buildstockbatch.aws.awsbase import AwsJobBase, boto_client_config from buildstockbatch.cloud.docker_base import DockerBatchBase -from buildstockbatch.utils import log_error_details, get_project_configuration +from buildstockbatch.utils import ( + log_error_details, + get_project_configuration, + get_bool_env_var, +) logger = logging.getLogger(__name__) +def backoff(thefunc, *args, **kwargs): + backoff_mult = 1.1 + delay = 3 + tries = 5 + error_patterns = [r"\w+.NotFound"] + while tries > 0: + try: + result = thefunc(*args, **kwargs) + except ClientError as error: + error_code = error.response["Error"]["Code"] + caught_error = False + for pat in error_patterns: + if re.search(pat, error_code): + logger.debug(f"{error_code}: Waiting and retrying in {delay} seconds") + caught_error = True + time.sleep(delay) + delay *= backoff_mult + tries -= 1 + break + if not caught_error: + raise error + else: + return result + + def upload_file_to_s3(*args, **kwargs): - s3 = boto3.client("s3") + s3 = boto3.client("s3", config=boto_client_config) s3.upload_file(*args, **kwargs) @@ -61,14 +96,25 @@ def filename_generator(): ) +def compress_file(in_filename, out_filename): + with gzip.open(str(out_filename), "wb") as f_out: + with open(str(in_filename), "rb") as f_in: + shutil.copyfileobj(f_in, f_out) + + +def calc_hash_for_file(filename): + with open(filename, "rb") as f: + return hashlib.sha256(f.read()).hexdigest() + + def copy_s3_file(src_bucket, src_key, dest_bucket, dest_key): - s3 = boto3.client("s3") + s3 = boto3.client("s3", config=boto_client_config) s3.copy({"Bucket": src_bucket, "Key": src_key}, dest_bucket, dest_key) class AwsBatchEnv(AwsJobBase): """ - Class to manage the AWS Batch environment and Step Function controller. + Class to manage the AWS Batch environment. """ def __init__(self, job_name, aws_config, boto3_session): @@ -79,14 +125,13 @@ def __init__(self, job_name, aws_config, boto3_session): """ super().__init__(job_name, aws_config, boto3_session) - self.batch = self.session.client("batch") - self.ec2 = self.session.client("ec2") - self.ec2r = self.session.resource("ec2") - self.emr = self.session.client("emr") - self.step_functions = self.session.client("stepfunctions") - self.aws_lambda = self.session.client("lambda") - self.s3 = self.session.client("s3") - self.s3_res = self.session.resource("s3") + self.batch = self.session.client("batch", config=boto_client_config) + self.ec2 = self.session.client("ec2", config=boto_client_config) + self.ec2r = self.session.resource("ec2", config=boto_client_config) + self.step_functions = self.session.client("stepfunctions", config=boto_client_config) + self.aws_lambda = self.session.client("lambda", config=boto_client_config) + self.s3 = self.session.client("s3", config=boto_client_config) + self.s3_res = self.session.resource("s3", config=boto_client_config) self.task_role_arn = None self.job_definition_arn = None @@ -95,66 +140,17 @@ def __init__(self, job_name, aws_config, boto3_session): self.service_role_arn = None self.instance_profile_arn = None self.job_queue_arn = None + self.s3_gateway_endpoint_id = None + self.prefix_list_id = None logger.propagate = False def __repr__(self): return super().__repr__() - def create_emr_lambda_roles(self): - """ - Create supporting IAM roles for Lambda support. - """ - - # EMR - - lambda_policy = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": "logs:CreateLogGroup", - "Resource": f"arn:aws:logs:{self.region}:{self.account}:*", - }, - { - "Effect": "Allow", - "Action": ["logs:CreateLogStream", "logs:PutLogEvents"], - "Resource": [f"arn:aws:logs:{self.region}:{self.account}:log-group:/aws/lambda/launchemr:*"], - }, - { - "Effect": "Allow", - "Action": "elasticmapreduce:RunJobFlow", - "Resource": "*", - }, - { - "Effect": "Allow", - "Action": "iam:PassRole", - "Resource": [ - f"arn:aws:iam::{self.account}:role/EMR_DefaultRole", - f"arn:aws:iam::{self.account}:role/EMR_EC2_DefaultRole", - f"arn:aws:iam::{self.account}:role/EMR_AutoScaling_DefaultRole", - self.emr_job_flow_role_arn, - self.emr_service_role_arn, - ], - }, - { - "Effect": "Allow", - "Action": "s3:GetObject", - "Resource": [f"arn:aws:s3:::{self.s3_bucket}/*"], - }, - ], - } - - self.lambda_emr_job_step_execution_role_arn = self.iam_helper.role_stitcher( - self.lambda_emr_job_step_execution_role, - "lambda", - f"Lambda execution role for {self.lambda_emr_job_step_function_name}", - policies_list=[json.dumps(lambda_policy, indent=4)], - ) - def create_vpc(self): cidrs_in_use = set() - vpc_response = AWSRetry.backoff()(self.ec2.describe_vpcs)() + vpc_response = self.ec2.describe_vpcs() for vpc in vpc_response["Vpcs"]: cidrs_in_use.add(vpc["CidrBlock"]) for cidr_assoc in vpc["CidrBlockAssociationSet"]: @@ -171,7 +167,8 @@ def create_vpc(self): # Create the VPC - response = self.ec2.create_vpc( + response = backoff( + self.ec2.create_vpc, CidrBlock=self.vpc_cidr, AmazonProvidedIpv6CidrBlock=False, InstanceTenancy="default", @@ -179,20 +176,11 @@ def create_vpc(self): self.vpc_id = response["Vpc"]["VpcId"] logger.info(f"VPC {self.vpc_id} created") - while True: - try: - self.ec2.create_tags( - Resources=[self.vpc_id], - Tags=[{"Key": "Name", "Value": self.job_identifier}], - ) - break - except Exception as e: - if "InvalidVpcID.NotFound" in str(e): - logger.info("Cannot tag VPC. VPC not yet created. Sleeping...") - time.sleep(5) - else: - raise - + backoff( + self.ec2.create_tags, + Resources=[self.vpc_id], + Tags=self.get_tags_uppercase(Name=self.job_identifier), + ) # Find the default security group sec_response = self.ec2.describe_security_groups( @@ -205,7 +193,8 @@ def create_vpc(self): logger.info(f"Security group {self.batch_security_group} created for vpc/job.") - response = self.ec2.authorize_security_group_ingress( + response = backoff( + self.ec2.authorize_security_group_ingress, GroupId=self.batch_security_group, IpPermissions=[ { @@ -221,7 +210,8 @@ def create_vpc(self): # Create the private subnets - priv_response_1 = self.ec2.create_subnet( + priv_response_1 = backoff( + self.ec2.create_subnet, CidrBlock=self.priv_subnet_cidr_1, AvailabilityZone=f"{self.region}a", VpcId=self.vpc_id, @@ -231,7 +221,8 @@ def create_vpc(self): logger.info("Private subnet created.") - priv_response_2 = self.ec2.create_subnet( + priv_response_2 = backoff( + self.ec2.create_subnet, CidrBlock=self.priv_subnet_cidr_2, AvailabilityZone=f"{self.region}b", VpcId=self.vpc_id, @@ -241,61 +232,61 @@ def create_vpc(self): logger.info("Private subnet created.") - self.ec2.create_tags( + backoff( + self.ec2.create_tags, Resources=[self.priv_vpc_subnet_id_1], - Tags=[{"Key": "Name", "Value": self.job_identifier}], + Tags=self.get_tags_uppercase(Name=self.job_identifier), ) - self.ec2.create_tags( + backoff( + self.ec2.create_tags, Resources=[self.priv_vpc_subnet_id_2], - Tags=[{"Key": "Name", "Value": self.job_identifier}], + Tags=self.get_tags_uppercase(Name=self.job_identifier), ) ig_response = self.ec2.create_internet_gateway() self.internet_gateway_id = ig_response["InternetGateway"]["InternetGatewayId"] - AWSRetry.backoff()(self.ec2.create_tags)( + backoff( + self.ec2.create_tags, Resources=[self.internet_gateway_id], - Tags=[{"Key": "Name", "Value": self.job_identifier}], + Tags=self.get_tags_uppercase(Name=self.job_identifier), ) logger.info(f"Internet gateway {self.internet_gateway_id} created.") # Create the public subnet - pub_response = self.ec2.create_subnet(CidrBlock=self.pub_subnet_cidr, VpcId=self.vpc_id) + pub_response = backoff(self.ec2.create_subnet, CidrBlock=self.pub_subnet_cidr, VpcId=self.vpc_id) logger.info("EIP allocated.") self.pub_vpc_subnet_id = pub_response["Subnet"]["SubnetId"] - self.ec2.create_tags( + backoff( + self.ec2.create_tags, Resources=[self.pub_vpc_subnet_id], - Tags=[{"Key": "Name", "Value": self.job_identifier}], + Tags=self.get_tags_uppercase(Name=self.job_identifier), ) # Create and elastic IP for the NAT Gateway - try: - ip_response = self.ec2.allocate_address(Domain="vpc") - - self.nat_ip_allocation = ip_response["AllocationId"] + ip_response = backoff(self.ec2.allocate_address, Domain="vpc") - logger.info("EIP allocated.") + self.nat_ip_allocation = ip_response["AllocationId"] - self.ec2.create_tags( - Resources=[self.nat_ip_allocation], - Tags=[{"Key": "Name", "Value": self.job_identifier}], - ) + logger.info("EIP allocated.") - except Exception as e: - if "AddressLimitExceeded" in str(e): - raise + backoff( + self.ec2.create_tags, + Resources=[self.nat_ip_allocation], + Tags=self.get_tags_uppercase(Name=self.job_identifier), + ) # Create an internet gateway - self.ec2.attach_internet_gateway(InternetGatewayId=self.internet_gateway_id, VpcId=self.vpc_id) + backoff(self.ec2.attach_internet_gateway, InternetGatewayId=self.internet_gateway_id, VpcId=self.vpc_id) logger.info("Internet Gateway attached.") @@ -311,69 +302,82 @@ def create_vpc(self): # Modify the default route table to be used as the public route - while True: - try: - self.ec2.create_route( - DestinationCidrBlock="0.0.0.0/0", - GatewayId=self.internet_gateway_id, - RouteTableId=self.pub_route_table_id, - ) - logger.info("Route created for Internet Gateway.") - break - - except Exception as e: - if "NotFound" in str(e): - time.sleep(5) - logger.info("Internet Gateway not yet created. Sleeping...") - else: - raise + backoff( + self.ec2.create_route, + DestinationCidrBlock="0.0.0.0/0", + GatewayId=self.internet_gateway_id, + RouteTableId=self.pub_route_table_id, + ) # Create a NAT Gateway - nat_response = self.ec2.create_nat_gateway(AllocationId=self.nat_ip_allocation, SubnetId=self.pub_vpc_subnet_id) + nat_response = backoff( + self.ec2.create_nat_gateway, AllocationId=self.nat_ip_allocation, SubnetId=self.pub_vpc_subnet_id + ) self.nat_gateway_id = nat_response["NatGateway"]["NatGatewayId"] + backoff( + self.ec2.create_tags, + Resources=[self.nat_gateway_id], + Tags=self.get_tags_uppercase(Name=self.job_identifier), + ) + logger.info("NAT Gateway created.") # Create a new private route table - prt_response = self.ec2.create_route_table(VpcId=self.vpc_id) + prt_response = backoff(self.ec2.create_route_table, VpcId=self.vpc_id) self.priv_route_table_id = prt_response["RouteTable"]["RouteTableId"] logger.info("Route table created.") - AWSRetry.backoff()(self.ec2.create_tags)( - Resources=[self.priv_route_table_id], - Tags=[{"Key": "Name", "Value": self.job_identifier}], + backoff( + self.ec2.create_tags, + Resources=[self.nat_gateway_id, self.priv_route_table_id], + Tags=self.get_tags_uppercase(Name=self.job_identifier), ) # Associate the private route to the private subnet - self.ec2.associate_route_table(RouteTableId=self.priv_route_table_id, SubnetId=self.priv_vpc_subnet_id_1) + backoff( + self.ec2.associate_route_table, RouteTableId=self.priv_route_table_id, SubnetId=self.priv_vpc_subnet_id_1 + ) logger.info("Route table associated with subnet.") - self.ec2.associate_route_table(RouteTableId=self.priv_route_table_id, SubnetId=self.priv_vpc_subnet_id_2) + backoff( + self.ec2.associate_route_table, RouteTableId=self.priv_route_table_id, SubnetId=self.priv_vpc_subnet_id_2 + ) logger.info("Route table associated with subnet.") # Associate the NAT gateway with the private route - while True: - try: - self.ec2.create_route( - DestinationCidrBlock="0.0.0.0/0", - NatGatewayId=self.nat_gateway_id, - RouteTableId=self.priv_route_table_id, - ) - logger.info("Route created for subnet.") - break - except Exception as e: - if "InvalidNatGatewayID.NotFound" in str(e): - time.sleep(5) - logger.info("Nat Gateway not yet created. Sleeping...") - else: - raise + backoff( + self.ec2.create_route, + DestinationCidrBlock="0.0.0.0/0", + NatGatewayId=self.nat_gateway_id, + RouteTableId=self.priv_route_table_id, + ) + + gateway_response = backoff( + self.ec2.create_vpc_endpoint, + VpcId=self.vpc_id, + ServiceName=f"com.amazonaws.{self.region}.s3", + RouteTableIds=[self.priv_route_table_id, self.pub_route_table_id], + VpcEndpointType="Gateway", + PolicyDocument='{"Statement": [{"Action": "*", "Effect": "Allow", "Resource": "*", "Principal": "*"}]}', + ) + + logger.info("S3 gateway created for VPC.") + + self.s3_gateway_endpoint_id = gateway_response["VpcEndpoint"]["VpcEndpointId"] + + backoff( + self.ec2.create_tags, + Resources=[self.s3_gateway_endpoint_id], + Tags=self.get_tags_uppercase(Name=self.job_identifier), + ) def generate_name_value_inputs(self, var_dictionary): """ @@ -540,6 +544,44 @@ def create_compute_environment(self, maxCPUs=10000): """ + logger.debug(f"Creating launch template {self.launch_template_name}") + try: + self.ec2.create_launch_template( + LaunchTemplateName=self.launch_template_name, + LaunchTemplateData={ + "BlockDeviceMappings": [ + { + "DeviceName": "/dev/xvda", + "Ebs": {"VolumeSize": 100, "VolumeType": "gp2"}, + } + ] + }, + ) + except ClientError as error: + if error.response["Error"]["Code"] == "InvalidLaunchTemplateName.AlreadyExistsException": + logger.debug("Launch template exists, skipping creation") + else: + raise error + + while True: + lt_resp = self.ec2.describe_launch_templates(LaunchTemplateNames=[self.launch_template_name]) + launch_templates = lt_resp["LaunchTemplates"] + next_token = lt_resp.get("NextToken") + while next_token: + lt_resp = self.ec2.describe_launch_templates( + LaunchTemplateNames=[self.launch_template_name], + NextToken=next_token, + ) + launch_templates.extend(lt_resp["LaunchTemplates"]) + next_token = lt_resp.get("NextToken") + n_launch_templates = len(launch_templates) + assert n_launch_templates <= 1, f"There are {n_launch_templates} launch templates, this shouldn't happen." + if n_launch_templates == 0: + logger.debug(f"Waiting for the launch template {self.launch_template_name} to be created") + time.sleep(5) + if n_launch_templates == 1: + break + try: compute_resources = { "minvCpus": 0, @@ -548,7 +590,9 @@ def create_compute_environment(self, maxCPUs=10000): "instanceTypes": [ "optimal", ], - "imageId": self.batch_compute_environment_ami, + "launchTemplate": { + "launchTemplateName": self.launch_template_name, + }, "subnets": [self.priv_vpc_subnet_id_1, self.priv_vpc_subnet_id_2], "securityGroupIds": [self.batch_security_group], "instanceRole": self.instance_profile_arn, @@ -565,12 +609,15 @@ def create_compute_environment(self, maxCPUs=10000): else: compute_resources["type"] = "EC2" + compute_resources["tags"] = self.get_tags(Name=f"{self.job_identifier} batch instance") + self.batch.create_compute_environment( computeEnvironmentName=self.batch_compute_environment_name, type="MANAGED", state="ENABLED", computeResources=compute_resources, serviceRole=self.service_role_arn, + tags=self.get_tags(), ) logger.info(f"Compute environment {self.batch_compute_environment_name} created.") @@ -598,6 +645,7 @@ def create_job_queue(self): "computeEnvironment": self.batch_compute_environment_name, }, ], + tags=self.get_tags(), ) # print("JOB QUEUE") @@ -619,9 +667,7 @@ def create_job_queue(self): elif "is not valid" in str(e): # Need to wait a second for the compute environment to complete registration - logger.warning( - "5 second sleep initiated to wait for compute environment creation due to error: " + str(e) - ) + logger.warning("waiting a few seconds for compute environment creation: " + str(e)) time.sleep(5) else: @@ -651,6 +697,7 @@ def create_job_definition(self, docker_image, vcpus, memory, command, env_vars): "environment": self.generate_name_value_inputs(env_vars), }, retryStrategy={"attempts": 2}, + tags=self.get_tags(), ) self.job_definition_arn = response["jobDefinitionArn"] @@ -660,206 +707,17 @@ def submit_job(self, array_size=4): Submits the created job definition and version to be run. """ - while True: - try: - self.batch.submit_job( - jobName=self.job_identifier, - jobQueue=self.batch_job_queue_name, - arrayProperties={"size": array_size}, - jobDefinition=self.job_definition_arn, - ) - - logger.info(f"Job {self.job_identifier} submitted.") - break - - except Exception as e: - if "not in VALID state" in str(e): - # Need to wait a second for the compute environment to complete registration - logger.warning("5 second sleep initiated to wait for job queue creation due to error: " + str(e)) - time.sleep(5) - else: - raise - - def create_state_machine_roles(self): - lambda_policy = f"""{{ - "Version": "2012-10-17", - "Statement": [ - {{ - "Effect": "Allow", - "Action": [ - "lambda:InvokeFunction" - ], - "Resource": [ - "arn:aws:lambda:*:*:function:{self.lambda_emr_job_step_function_name}" - ] - }} - ] -}} - - """ - - batch_policy = """{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "batch:SubmitJob", - "batch:DescribeJobs", - "batch:TerminateJob" - ], - "Resource": "*" - }, - { - "Effect": "Allow", - "Action": [ - "events:PutTargets", - "events:PutRule", - "events:DescribeRule" - ], - "Resource": [ - "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule" - ] - } - ] -} - - """ - - sns_policy = f"""{{ - "Version": "2012-10-17", - "Statement": [ - {{ - "Effect": "Allow", - "Action": [ - "sns:Publish" - ], - "Resource": "arn:aws:sns:*:*:{self.sns_state_machine_topic}" - }} - ] - }} - """ - - policies_list = [lambda_policy, batch_policy, sns_policy] - - self.state_machine_role_arn = self.iam_helper.role_stitcher( - self.state_machine_role_name, - "states", - "Permissions for statemachine to run jobs", - policies_list=policies_list, - ) - - def create_state_machine(self): - job_definition = f"""{{ - "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion", - "StartAt": "Submit Batch Job", - "States": {{ - "Submit Batch Job": {{ - "Type": "Task", - "Resource": "arn:aws:states:::batch:submitJob.sync", - "Parameters": {{ - "JobDefinition": "{self.job_definition_arn}", - "JobName": "{self.job_identifier}", - "JobQueue": "{self.job_queue_arn}", - "ArrayProperties": {{ - "Size.$": "$.array_size" - }} - }}, - "Next": "Notify Batch Success", - "Catch": [ - {{ - "ErrorEquals": [ "States.ALL" ], - "Next": "Notify Batch Failure" - }} - ] - }}, - "Notify Batch Success": {{ - "Type": "Task", - "Resource": "arn:aws:states:::sns:publish", - "Parameters": {{ - "Message": "Batch job submitted through Step Functions succeeded", - "TopicArn": "arn:aws:sns:{self.region}:{self.account}:{self.sns_state_machine_topic}" - }}, - "Next": "Run EMR Job" - }}, - "Notify Batch Failure": {{ - "Type": "Task", - "Resource": "arn:aws:states:::sns:publish", - "Parameters": {{ - "Message": "Batch job submitted through Step Functions failed", - "TopicArn": "arn:aws:sns:{self.region}:{self.account}:{self.sns_state_machine_topic}" - }}, - "Next": "Job Failure" - }}, - "Run EMR Job": {{ - "Type": "Task", - "Resource": "arn:aws:lambda:{self.region}:{self.account}:function:{self.lambda_emr_job_step_function_name}", - "Next": "Notify EMR Job Success", - "Catch": [ - {{ - "ErrorEquals": [ "States.ALL" ], - "Next": "Notify EMR Job Failure" - }} - ] - }}, - "Notify EMR Job Success": {{ - "Type": "Task", - "Resource": "arn:aws:states:::sns:publish", - "Parameters": {{ - "Message": "EMR Job succeeded", - "TopicArn": "arn:aws:sns:{self.region}:{self.account}:{self.sns_state_machine_topic}" - }}, - "End": true - }}, - "Notify EMR Job Failure": {{ - "Type": "Task", - "Resource": "arn:aws:states:::sns:publish", - "Parameters": {{ - "Message": "EMR job failed", - "TopicArn": "arn:aws:sns:{self.region}:{self.account}:{self.sns_state_machine_topic}" - }}, - "Next": "Job Failure" - }}, - "Job Failure": {{ - "Type": "Fail" - }} - }} -}} - - """ - - while True: - try: - response = self.step_functions.create_state_machine( - name=self.state_machine_name, - definition=job_definition, - roleArn=self.state_machine_role_arn, - ) - - # print(response) - self.state_machine_arn = response["stateMachineArn"] - logger.info(f"State machine {self.state_machine_name} created.") - break - except Exception as e: - if "AccessDeniedException" in str(e): - logger.info("State machine role not yet registered, sleeping...") - time.sleep(5) - elif "StateMachineAlreadyExists" in str(e): - logger.info("State machine already exists, skipping...") - self.state_machine_arn = f"arn:aws:states:{self.region}:{self.account}:stateMachine:{self.state_machine_name}" # noqa E501 - - break - else: - raise - - def start_state_machine_execution(self, array_size): - self.step_functions.start_execution( - stateMachineArn=self.state_machine_arn, - name=f"{self.state_machine_name}_execution_{int(time.time())}", - input=f'{{"array_size": {array_size}}}', + resp = backoff( + self.batch.submit_job, + jobName=self.job_identifier, + jobQueue=self.batch_job_queue_name, + arrayProperties={"size": array_size}, + jobDefinition=self.job_definition_arn, + tags=self.get_tags(), ) - logger.info(f"Starting state machine {self.state_machine_name}.") + logger.info(f"Job {self.job_identifier} submitted.") + return resp def clean(self): # Get our vpc: @@ -879,24 +737,6 @@ def clean(self): except (KeyError, IndexError): self.vpc_id = None - logger.info("Cleaning up EMR.") - - try: - self.emr.terminate_job_flows(JobFlowIds=[self.emr_cluster_name]) - logger.info(f"EMR cluster {self.emr_cluster_name} deleted.") - - except Exception as e: - if "ResourceNotFoundException" in str(e): - logger.info(f"EMR cluster {self.emr_cluster_name} already MIA - skipping...") - - self.iam_helper.remove_role_from_instance_profile(self.emr_instance_profile_name) - self.iam_helper.delete_instance_profile(self.emr_instance_profile_name) - self.iam_helper.delete_role(self.emr_job_flow_role_name) - self.iam_helper.delete_role(self.emr_service_role_name) - - logger.info(f"EMR clean complete. Results bucket and data {self.s3_bucket} have not been deleted.") - - logger.info(f"Deleting Security group {self.emr_cluster_security_group_name}.") default_sg_response = self.ec2.describe_security_groups( Filters=[ { @@ -916,72 +756,6 @@ def clean(self): if len(dsg.ip_permissions_egress): response = dsg.revoke_egress(IpPermissions=dsg.ip_permissions_egress) - sg_response = AWSRetry.backoff()(self.ec2.describe_security_groups)( - Filters=[ - { - "Name": "group-name", - "Values": [ - self.emr_cluster_security_group_name, - ], - }, - ] - ) - - try: - group_id = sg_response["SecurityGroups"][0]["GroupId"] - sg = self.ec2r.SecurityGroup(group_id) - if len(sg.ip_permissions): - sg.revoke_ingress(IpPermissions=sg.ip_permissions) - - while True: - try: - self.ec2.delete_security_group(GroupId=group_id) - break - except ClientError: - logger.info("Waiting for security group ingress rules to be removed ...") - time.sleep(5) - - logger.info(f"Deleted security group {self.emr_cluster_security_group_name}.") - except Exception as e: - if "does not exist" in str(e) or "list index out of range" in str(e): - logger.info(f"Security group {self.emr_cluster_security_group_name} does not exist - skipping...") - else: - raise - - try: - self.aws_lambda.delete_function(FunctionName=self.lambda_emr_job_step_function_name) - except Exception as e: - if "Function not found" in str(e): - logger.info(f"Function {self.lambda_emr_job_step_function_name} not found, skipping...") - else: - raise - - try: - self.s3.delete_object(Bucket=self.s3_bucket, Key=self.s3_lambda_code_emr_cluster_key) - logger.info( - f"S3 object {self.s3_lambda_code_emr_cluster_key} for bucket {self.s3_bucket} deleted." # noqa E501 - ) - except Exception as e: - if "NoSuchBucket" in str(e): - logger.info( - f"S3 object {self.s3_lambda_code_emr_cluster_key} for bucket {self.s3_bucket} missing - not deleted." # noqa E501 - ) - else: - raise - - self.iam_helper.delete_role(self.lambda_emr_job_step_execution_role) - - state_machines = self.step_functions.list_state_machines() - - for sm in state_machines["stateMachines"]: - if sm["name"] == self.state_machine_name: - self.state_machine_arn = sm["stateMachineArn"] - self.step_functions.delete_state_machine(stateMachineArn=self.state_machine_arn) - logger.info(f"Deleted state machine {self.state_machine_name}.") - break - - self.iam_helper.delete_role(self.state_machine_role_name) - try: self.batch.update_job_queue(jobQueue=self.batch_job_queue_name, state="DISABLED") @@ -1026,6 +800,15 @@ def clean(self): else: raise + # Delete Launch Template + try: + self.ec2.delete_launch_template(LaunchTemplateName=self.launch_template_name) + except Exception as e: + if "does not exist" in str(e): + logger.info(f"Launch template {self.launch_template_name} does not exist, skipping...") + else: + raise + self.iam_helper.delete_role(self.batch_service_role_name) self.iam_helper.delete_role(self.batch_spot_service_role_name) self.iam_helper.delete_role(self.batch_ecs_task_role_name) @@ -1036,7 +819,7 @@ def clean(self): # Find Nat Gateways and VPCs - response = AWSRetry.backoff()(self.ec2.describe_vpcs)( + response = self.ec2.describe_vpcs( Filters=[ { "Name": "tag:Name", @@ -1050,9 +833,15 @@ def clean(self): for vpc in response["Vpcs"]: this_vpc = vpc["VpcId"] - ng_response = AWSRetry.backoff()(self.ec2.describe_nat_gateways)( - Filters=[{"Name": "vpc-id", "Values": [this_vpc]}] - ) + s3gw_response = self.ec2.describe_vpc_endpoints(Filters=[{"Name": "vpc-id", "Values": [this_vpc]}]) + + for s3gw in s3gw_response["VpcEndpoints"]: + this_s3gw = s3gw["VpcEndpointId"] + + if s3gw["State"] != "deleted": + self.ec2.delete_vpc_endpoints(VpcEndpointIds=[this_s3gw]) + + ng_response = self.ec2.describe_nat_gateways(Filters=[{"Name": "vpc-id", "Values": [this_vpc]}]) for natgw in ng_response["NatGateways"]: this_natgw = natgw["NatGatewayId"] @@ -1060,9 +849,7 @@ def clean(self): if natgw["State"] != "deleted": self.ec2.delete_nat_gateway(NatGatewayId=this_natgw) - rtas_response = AWSRetry.backoff()(self.ec2.describe_route_tables)( - Filters=[{"Name": "vpc-id", "Values": [this_vpc]}] - ) + rtas_response = self.ec2.describe_route_tables(Filters=[{"Name": "vpc-id", "Values": [this_vpc]}]) for route_table in rtas_response["RouteTables"]: route_table_id = route_table["RouteTableId"] @@ -1081,14 +868,13 @@ def clean(self): rt_counter = rt_counter - 1 if "DependencyViolation" in str(e): logger.info( - "Waiting for association to be released before deleting route table. " - "Sleeping..." - ) + "Waiting for association to be released before deleting route table. Sleeping..." + ) # noqa E501 time.sleep(5) else: raise - igw_response = AWSRetry.backoff()(self.ec2.describe_internet_gateways)( + igw_response = self.ec2.describe_internet_gateways( Filters=[{"Name": "tag:Name", "Values": [self.job_identifier]}] ) @@ -1134,7 +920,7 @@ def clean(self): else: raise - AWSRetry.backoff()(self.ec2.delete_vpc)(VpcId=this_vpc) + self.ec2.delete_vpc(VpcId=this_vpc) # Find the Elastic IP from the NAT response = self.ec2.describe_addresses( @@ -1152,315 +938,13 @@ def clean(self): response = self.ec2.release_address(AllocationId=this_address) - def create_emr_security_groups(self): try: - response = self.ec2.create_security_group( - Description="EMR Job Flow Security Group (full cluster access)", - GroupName=self.emr_cluster_security_group_name, - VpcId=self.vpc_id, - ) - self.emr_cluster_security_group_id = response["GroupId"] - - except Exception as e: - if "already exists for VPC" in str(e): - logger.info("Security group for EMR already exists, skipping ...") - response = self.ec2.describe_security_groups( - Filters=[ - { - "Name": "group-name", - "Values": [ - self.emr_cluster_security_group_name, - ], - }, - ] - ) - - self.emr_cluster_security_group_id = response["SecurityGroups"][0]["GroupId"] - else: - raise - - try: - response = self.ec2.authorize_security_group_ingress( - GroupId=self.emr_cluster_security_group_id, - IpPermissions=[ - dict( - IpProtocol="-1", - UserIdGroupPairs=[ - dict( - GroupId=self.emr_cluster_security_group_id, - UserId=self.account, - ) - ], - ) - ], - ) - except Exception as e: - if "already exists" in str(e): - logger.info("Security group egress rule for EMR already exists, skipping ...") + self.ec2.delete_security_group(GroupName=f"dask-{self.job_identifier}") + except ClientError as error: + if error.response["Error"]["Code"] == "InvalidGroup.NotFound": + pass else: - raise - - def create_emr_iam_roles(self): - self.emr_service_role_arn = self.iam_helper.role_stitcher( - self.emr_service_role_name, - "elasticmapreduce", - f"EMR Service Role {self.job_identifier}", - managed_policie_arns=["arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"], - ) - - emr_policy = """{ - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "VisualEditor0", - "Effect": "Allow", - "Action": [ - "glue:GetCrawler", - "glue:CreateTable", - "glue:DeleteCrawler", - "glue:StartCrawler", - "glue:StopCrawler", - "glue:DeleteTable", - "glue:ListCrawlers", - "glue:UpdateCrawler", - "glue:CreateCrawler", - "glue:GetCrawlerMetrics", - "glue:BatchDeleteTable" - ], - "Resource": "*" - }, - { - "Sid": "VisualEditor1", - "Effect": "Allow", - "Action": [ - "iam:PassRole" - ], - "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole-default" - } - ] -}""" - - self.emr_job_flow_role_arn = self.iam_helper.role_stitcher( - self.emr_job_flow_role_name, - "ec2", - f"EMR Job Flow Role {self.job_identifier}", - managed_policie_arns=["arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role"], - policies_list=[emr_policy], - ) - - try: - response = self.iam.create_instance_profile(InstanceProfileName=self.emr_instance_profile_name) - - self.emr_instance_profile_arn = response["InstanceProfile"]["Arn"] - - logger.info("EMR Instance Profile created") - - response = self.iam.add_role_to_instance_profile( - InstanceProfileName=self.emr_instance_profile_name, - RoleName=self.emr_job_flow_role_name, - ) - - except Exception as e: - if "EntityAlreadyExists" in str(e): - logger.info("EMR Instance Profile not created - already exists") - response = self.iam.get_instance_profile(InstanceProfileName=self.emr_instance_profile_name) - self.emr_instance_profile_arn = response["InstanceProfile"]["Arn"] - - def upload_assets(self): - logger.info("Uploading EMR support assets...") - fs = S3FileSystem() - here = os.path.dirname(os.path.abspath(__file__)) - emr_folder = f"{self.s3_bucket}/{self.s3_bucket_prefix}/{self.s3_emr_folder_name}" - fs.makedirs(emr_folder) - - # bsb_post.sh - bsb_post_bash = f"""#!/bin/bash - -aws s3 cp "s3://{self.s3_bucket}/{self.s3_bucket_prefix}/emr/bsb_post.py" bsb_post.py -/home/hadoop/miniconda/bin/python bsb_post.py "{self.s3_bucket}" "{self.s3_bucket_prefix}" - - """ - with fs.open(f"{emr_folder}/bsb_post.sh", "w", encoding="utf-8") as f: - f.write(bsb_post_bash) - - # bsb_post.py - fs.put(os.path.join(here, "s3_assets", "bsb_post.py"), f"{emr_folder}/bsb_post.py") - - # bootstrap-dask-custom - fs.put( - os.path.join(here, "s3_assets", "bootstrap-dask-custom"), - f"{emr_folder}/bootstrap-dask-custom", - ) - - # postprocessing.py - with fs.open(f"{emr_folder}/postprocessing.tar.gz", "wb") as f: - with tarfile.open(fileobj=f, mode="w:gz") as tarf: - tarf.add( - os.path.join(here, "..", "postprocessing.py"), - arcname="postprocessing.py", - ) - tarf.add( - os.path.join(here, "s3_assets", "setup_postprocessing.py"), - arcname="setup.py", - ) - - logger.info("EMR support assets uploaded.") - - def create_emr_cluster_function(self): - script_name = f"s3://{self.s3_bucket}/{self.s3_bucket_prefix}/{self.s3_emr_folder_name}/bsb_post.sh" - bootstrap_action = f"s3://{self.s3_bucket}/{self.s3_bucket_prefix}/{self.s3_emr_folder_name}/bootstrap-dask-custom" # noqa E501 - - run_job_flow_args = dict( - Name=self.emr_cluster_name, - LogUri=self.emr_log_uri, - ReleaseLabel="emr-5.23.0", - Instances={ - "InstanceGroups": [ - { - "Market": "SPOT" if self.batch_use_spot else "ON_DEMAND", - "InstanceRole": "MASTER", - "InstanceType": self.emr_manager_instance_type, - "InstanceCount": 1, - }, - { - "Market": "SPOT" if self.batch_use_spot else "ON_DEMAND", - "InstanceRole": "CORE", - "InstanceType": self.emr_worker_instance_type, - "InstanceCount": self.emr_worker_instance_count, - }, - ], - "Ec2SubnetId": self.priv_vpc_subnet_id_1, - "KeepJobFlowAliveWhenNoSteps": False, - "EmrManagedMasterSecurityGroup": self.emr_cluster_security_group_id, - "EmrManagedSlaveSecurityGroup": self.emr_cluster_security_group_id, - "ServiceAccessSecurityGroup": self.batch_security_group, - }, - Applications=[ - {"Name": "Hadoop"}, - ], - BootstrapActions=[ - { - "Name": "launchFromS3", - "ScriptBootstrapAction": { - "Path": bootstrap_action, - "Args": [f"s3://{self.s3_bucket}/{self.s3_bucket_prefix}/emr/postprocessing.tar.gz"], - }, - }, - ], - Steps=[ - { - "Name": "Dask", - "ActionOnFailure": "TERMINATE_CLUSTER", - "HadoopJarStep": { - "Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar", - "Args": [script_name], - }, - }, - ], - VisibleToAllUsers=True, - JobFlowRole=self.emr_instance_profile_name, - ServiceRole=self.emr_service_role_name, - Tags=[ - {"Key": "org", "Value": "ops"}, - ], - AutoScalingRole="EMR_AutoScaling_DefaultRole", - ScaleDownBehavior="TERMINATE_AT_TASK_COMPLETION", - EbsRootVolumeSize=100, - ) - - with io.BytesIO() as f: - f.write(json.dumps(run_job_flow_args).encode()) - f.seek(0) - self.s3.upload_fileobj(f, self.s3_bucket, self.s3_lambda_emr_config_key) - - lambda_filename = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "s3_assets", - "lambda_function.py", - ) - with open(lambda_filename, "r") as f: - function_script = f.read() - with io.BytesIO() as f: - with zipfile.ZipFile(f, mode="w", compression=zipfile.ZIP_STORED) as zf: - zi = zipfile.ZipInfo("emr_function.py") - zi.date_time = time.localtime() - zi.external_attr = 0o100755 << 16 - zf.writestr(zi, function_script, zipfile.ZIP_DEFLATED) - f.seek(0) - self.s3.upload_fileobj(f, self.s3_bucket, self.s3_lambda_code_emr_cluster_key) - - while True: - try: - self.aws_lambda.create_function( - FunctionName=self.lambda_emr_job_step_function_name, - Runtime="python3.7", - Role=self.lambda_emr_job_step_execution_role_arn, - Handler="emr_function.lambda_handler", - Code={ - "S3Bucket": self.s3_bucket, - "S3Key": self.s3_lambda_code_emr_cluster_key, - }, - Description=f"Lambda for emr cluster execution on job {self.job_identifier}", - Timeout=900, - MemorySize=128, - Publish=True, - Environment={ - "Variables": { - "REGION": self.region, - "BUCKET": self.s3_bucket, - "EMR_CONFIG_JSON_KEY": self.s3_lambda_emr_config_key, - } - }, - Tags={"job": self.job_identifier}, - ) - - logger.info(f"Lambda function {self.lambda_emr_job_step_function_name} created.") - break - - except Exception as e: - if "role defined for the function cannot be assumed" in str(e): - logger.info( - f"Lambda role not registered for {self.lambda_emr_job_step_function_name} - sleeping ..." - ) - time.sleep(5) - elif "Function already exist" in str(e): - logger.info(f"Lambda function {self.lambda_emr_job_step_function_name} exists, skipping...") - break - elif "ARN does not refer to a valid principal" in str(e): - logger.info("Waiting for roles/permissions to propagate to allow Lambda function creation ...") - time.sleep(5) - else: - raise - - -class AwsSNS(AwsJobBase): - def __init__(self, job_name, aws_config, boto3_session): - super().__init__(job_name, aws_config, boto3_session) - self.sns = self.session.client("sns") - self.sns_state_machine_topic_arn = None - - def create_topic(self): - response = self.sns.create_topic(Name=self.sns_state_machine_topic) - - logger.info(f"Simple notifications topic {self.sns_state_machine_topic} created.") - - self.sns_state_machine_topic_arn = response["TopicArn"] - - def subscribe_to_topic(self): - self.sns.subscribe( - TopicArn=self.sns_state_machine_topic_arn, - Protocol="email", - Endpoint=self.operator_email, - ) - - logger.info( - f"Operator {self.operator_email} subscribed to topic - please confirm via email to recieve state machine progress messages." # noqa 501 - ) - - def clean(self): - self.sns.delete_topic(TopicArn=f"arn:aws:sns:{self.region}:{self.account}:{self.sns_state_machine_topic}") - - logger.info(f"Simple notifications topic {self.sns_state_machine_topic} deleted.") + raise error class AwsBatch(DockerBatchBase): @@ -1471,8 +955,8 @@ def __init__(self, project_filename): self.project_filename = project_filename self.region = self.cfg["aws"]["region"] - self.ecr = boto3.client("ecr", region_name=self.region) - self.s3 = boto3.client("s3", region_name=self.region) + self.ecr = boto3.client("ecr", region_name=self.region, config=boto_client_config) + self.s3 = boto3.client("s3", region_name=self.region, config=boto_client_config) self.s3_bucket = self.cfg["aws"]["s3"]["bucket"] self.s3_bucket_prefix = self.cfg["aws"]["s3"]["prefix"].rstrip("/") self.batch_env_use_spot = self.cfg["aws"]["use_spot"] @@ -1480,29 +964,40 @@ def __init__(self, project_filename): self.boto3_session = boto3.Session(region_name=self.region) @staticmethod - def validate_instance_types(project_file): + def validate_dask_settings(project_file): cfg = get_project_configuration(project_file) - aws_config = cfg["aws"] - boto3_session = boto3.Session(region_name=aws_config["region"]) - ec2 = boto3_session.client("ec2") - job_base = AwsJobBase("genericjobid", aws_config, boto3_session) - instance_types_requested = set() - instance_types_requested.add(job_base.emr_manager_instance_type) - instance_types_requested.add(job_base.emr_worker_instance_type) - inst_type_resp = ec2.describe_instance_type_offerings( - Filters=[{"Name": "instance-type", "Values": list(instance_types_requested)}] - ) - instance_types_available = set([x["InstanceType"] for x in inst_type_resp["InstanceTypeOfferings"]]) - if not instance_types_requested == instance_types_available: - instance_types_not_available = instance_types_requested - instance_types_available - raise ValidationError( - f"The instance type(s) {', '.join(instance_types_not_available)} are not available in region {aws_config['region']}." # noqa E501 - ) + if "emr" in cfg["aws"]: + logger.warning("The `aws.emr` configuration is no longer used and is ignored. Recommend removing.") + dask_cfg = cfg["aws"]["dask"] + errors = [] + mem_rules = { + 1024: (2, 8, 1), + 2048: (4, 16, 1), + 4096: (8, 30, 1), + 8192: (16, 60, 4), + 16384: (32, 120, 8), + } + for node_type in ("scheduler", "worker"): + mem = dask_cfg.get(f"{node_type}_memory", 8 * 1024) + if mem % 1024 != 0: + errors.append(f"`aws.dask.{node_type}_memory` = {mem}, needs to be a multiple of 1024.") + mem_gb = mem // 1024 + min_gb, max_gb, incr_gb = mem_rules[dask_cfg.get(f"{node_type}_cpu", 2 * 1024)] + if not (min_gb <= mem_gb <= max_gb and (mem_gb - min_gb) % incr_gb == 0): + errors.append( + f"`aws.dask.{node_type}_memory` = {mem}, " + f"should be between {min_gb * 1024} and {max_gb * 1024} in a multiple of {incr_gb * 1024}." + ) + if errors: + errors.append("See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html") + raise ValidationError("\n".join(errors)) + + return True @staticmethod def validate_project(project_file): super(AwsBatch, AwsBatch).validate_project(project_file) - AwsBatch.validate_instance_types(project_file) + AwsBatch.validate_dask_settings(project_file) @property def docker_image(self): @@ -1512,6 +1007,14 @@ def docker_image(self): def weather_dir(self): return self._weather_dir + @property + def results_dir(self): + return f"{self.s3_bucket}/{self.s3_bucket_prefix}/results" + + @property + def output_dir(self): + return f"{self.s3_bucket}/{self.s3_bucket_prefix}" + @property def container_repo(self): repo_name = self.docker_image @@ -1525,6 +1028,10 @@ def container_repo(self): repo = resp["repository"] return repo + @property + def image_url(self): + return f"{self.container_repo['repositoryUri']}:{self.job_identifier}" + def build_image(self): """ Build the docker image to use in the batch simulation @@ -1532,8 +1039,77 @@ def build_image(self): root_path = pathlib.Path(os.path.abspath(__file__)).parent.parent.parent if not (root_path / "Dockerfile").exists(): raise RuntimeError(f"The needs to be run from the root of the repo, found {root_path}") - logger.debug("Building docker image") - self.docker_client.images.build(path=str(root_path), tag=self.docker_image, rm=True) + + # Make the buildstock/resources/.aws_docker_image dir to store logs + local_log_dir = pathlib.Path(self.buildstock_dir, "resources", ".aws_docker_image") + if not os.path.exists(local_log_dir): + os.makedirs(local_log_dir) + + # Determine whether or not to build the image with custom gems bundled in + if self.cfg.get("baseline", dict()).get("custom_gems", False): + # Ensure the custom Gemfile exists in the buildstock dir + local_gemfile_path = pathlib.Path(self.buildstock_dir, "resources", "Gemfile") + if not local_gemfile_path.exists(): + raise AttributeError(f"baseline:custom_gems = True, but did not find Gemfile at {local_gemfile_path}") + + # Copy the custom Gemfile into the buildstockbatch repo + new_gemfile_path = root_path / "Gemfile" + shutil.copyfile(local_gemfile_path, new_gemfile_path) + logger.info(f"Copying custom Gemfile from {local_gemfile_path}") + + # Choose the custom-gems stage in the Dockerfile, + # which runs bundle install to build custom gems into the image + stage = "buildstockbatch-custom-gems" + else: + # Choose the base stage in the Dockerfile, + # which stops before bundling custom gems into the image + stage = "buildstockbatch" + + logger.info(f"Building docker image stage: {stage} from OpenStudio {self.os_version}") + img, build_logs = self.docker_client.images.build( + path=str(root_path), + tag=self.docker_image, + rm=True, + target=stage, + platform="linux/amd64", + buildargs={"OS_VER": self.os_version}, + ) + build_image_log = os.path.join(local_log_dir, "build_image.log") + with open(build_image_log, "w") as f_out: + f_out.write("Built image") + for line in build_logs: + for itm_type, item_msg in line.items(): + if itm_type in ["stream", "status"]: + try: + f_out.write(f"{item_msg}") + except UnicodeEncodeError: + pass + logger.debug(f"Review docker image build log: {build_image_log}") + + # Report and confirm the openstudio version from the image + os_ver_cmd = "openstudio openstudio_version" + container_output = self.docker_client.containers.run( + self.docker_image, os_ver_cmd, remove=True, name="list_openstudio_version" + ) + assert self.os_version in container_output.decode() + + # Report gems included in the docker image. + # The OpenStudio Docker image installs the default gems + # to /var/oscli/gems, and the custom docker image + # overwrites these with the custom gems. + list_gems_cmd = ( + "openstudio --bundle /var/oscli/Gemfile --bundle_path /var/oscli/gems " + "--bundle_without native_ext gem_list" + ) + container_output = self.docker_client.containers.run( + self.docker_image, list_gems_cmd, remove=True, name="list_gems" + ) + gem_list_log = os.path.join(local_log_dir, "openstudio_gem_list_output.log") + with open(gem_list_log, "wb") as f_out: + f_out.write(container_output) + for line in container_output.decode().split("\n"): + logger.debug(line) + logger.debug(f"Review custom gems list at: {gem_list_log}") def push_image(self): """ @@ -1570,9 +1146,6 @@ def clean(self): batch_env = AwsBatchEnv(self.job_identifier, self.cfg["aws"], self.boto3_session) batch_env.clean() - sns_env = AwsSNS(self.job_identifier, self.cfg["aws"], self.boto3_session) - sns_env.clean() - def upload_batch_files_to_cloud(self, tmppath): """Implements :func:`DockerBase.upload_batch_files_to_cloud`""" logger.debug("Uploading Batch files to S3") @@ -1625,37 +1198,44 @@ def start_batch_job(self, batch_info): REGION=self.region, ) - image_url = "{}:{}".format(self.container_repo["repositoryUri"], self.job_identifier) - job_env_cfg = self.cfg["aws"].get("job_environment", {}) batch_env.create_job_definition( - image_url, - command=["python3.8", "-m", "buildstockbatch.aws.aws"], + self.image_url, + command=["python3", "-m", "buildstockbatch.aws.aws"], vcpus=job_env_cfg.get("vcpus", 1), memory=job_env_cfg.get("memory", 1024), env_vars=env_vars, ) - # SNS Topic - sns_env = AwsSNS(self.job_identifier, self.cfg["aws"], self.boto3_session) - sns_env.create_topic() - sns_env.subscribe_to_topic() - - # State machine - batch_env.create_state_machine_roles() - batch_env.create_state_machine() - - # EMR Function - batch_env.upload_assets() - batch_env.create_emr_iam_roles() - batch_env.create_emr_security_groups() - batch_env.create_emr_lambda_roles() - batch_env.create_emr_cluster_function() - # start job - batch_env.start_state_machine_execution(batch_info.job_count) + job_info = batch_env.submit_job(array_size=self.batch_array_size) + + # Monitor job status + n_succeeded_last_time = 0 + with tqdm.tqdm(desc="Running Simulations", total=self.batch_array_size) as progress_bar: + job_status = None + while job_status not in ("SUCCEEDED", "FAILED"): + time.sleep(10) + job_desc_resp = batch_env.batch.describe_jobs(jobs=[job_info["jobId"]]) + job_status = job_desc_resp["jobs"][0]["status"] + + jobs_resp = batch_env.batch.list_jobs(arrayJobId=job_info["jobId"], jobStatus="SUCCEEDED") + n_succeeded = len(jobs_resp["jobSummaryList"]) + next_token = jobs_resp.get("nextToken") + while next_token is not None: + jobs_resp = batch_env.batch.list_jobs( + arrayJobId=job_info["jobId"], + jobStatus="SUCCEEDED", + nextToken=next_token, + ) + n_succeeded += len(jobs_resp["jobSummaryList"]) + next_token = jobs_resp.get("nextToken") + progress_bar.update(n_succeeded - n_succeeded_last_time) + n_succeeded_last_time = n_succeeded - logger.info("Batch job submitted. Check your email to subscribe to notifications.") + logger.info(f"Batch job status: {job_status}") + if job_status == "FAILED": + raise RuntimeError("Batch Job Failed. Go look at the CloudWatch logs.") @classmethod def run_job(cls, job_id, bucket, prefix, job_name, region): @@ -1668,7 +1248,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region): """ logger.debug(f"region: {region}") - s3 = boto3.client("s3") + s3 = boto3.client("s3", config=boto_client_config) sim_dir = pathlib.Path("/var/simdata/openstudio") @@ -1695,7 +1275,36 @@ def run_job(cls, job_id, bucket, prefix, job_name, region): weather_dir = sim_dir / "weather" os.makedirs(weather_dir, exist_ok=True) - epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d) + # Make a lookup of which parameter points to the weather file from options_lookup.tsv + with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f: + tsv_reader = csv.reader(f, delimiter="\t") + next(tsv_reader) # skip headers + param_name = None + epws_by_option = {} + for row in tsv_reader: + row_has_epw = [x.endswith(".epw") for x in row[2:]] + if sum(row_has_epw): + if row[0] != param_name and param_name is not None: + raise RuntimeError( + f"The epw files are specified in options_lookup.tsv under more than one parameter type: {param_name}, {row[0]}" + ) # noqa: E501 + epw_filename = row[row_has_epw.index(True) + 2].split("=")[1].split("/")[-1] + param_name = row[0] + option_name = row[1] + epws_by_option[option_name] = epw_filename + + # Look through the buildstock.csv to find the appropriate location and epw + epws_to_download = set() + building_ids = [x[0] for x in jobs_d["batch"]] + with open( + sim_dir / "lib" / "housing_characteristics" / "buildstock.csv", + "r", + encoding="utf-8", + ) as f: + csv_reader = csv.DictReader(f) + for row in csv_reader: + if int(row["Building"]) in building_ids: + epws_to_download.add(epws_by_option[row[param_name]]) # Download the epws needed for these simulations for epw_filename in epws_to_download: @@ -1706,7 +1315,88 @@ def run_job(cls, job_id, bucket, prefix, job_name, region): logger.debug("Extracting {}".format(epw_filename)) f_out.write(gzip.decompress(f_gz.getvalue())) - cls.run_simulations(cfg, jobs_d, job_id, sim_dir, S3FileSystem(), f"{bucket}/{prefix}") + cls.run_simulations(cfg, job_id, jobs_d, sim_dir, S3FileSystem(), f"{bucket}/{prefix}") + + def get_fs(self): + return S3FileSystem() + + def get_dask_client(self): + dask_cfg = self.cfg["aws"]["dask"] + + batch_env = AwsBatchEnv(self.job_identifier, self.cfg["aws"], self.boto3_session) + m = 1024 + self.dask_cluster = FargateCluster( + region_name=self.region, + fargate_spot=True, + image=self.image_url, + cluster_name_template=f"dask-{self.job_identifier}", + scheduler_cpu=dask_cfg.get("scheduler_cpu", 2 * m), + scheduler_mem=dask_cfg.get("scheduler_memory", 8 * m), + worker_cpu=dask_cfg.get("worker_cpu", 2 * m), + worker_mem=dask_cfg.get("worker_memory", 8 * m), + n_workers=dask_cfg["n_workers"], + task_role_policies=["arn:aws:iam::aws:policy/AmazonS3FullAccess"], + tags=batch_env.get_tags(), + ) + self.dask_client = Client(self.dask_cluster) + return self.dask_client + + def cleanup_dask(self): + self.dask_client.close() + self.dask_cluster.close() + + def upload_results(self, *args, **kwargs): + """Do nothing because the results are already on S3""" + return self.s3_bucket, self.s3_bucket_prefix + "/results/parquet" + + def process_results(self, *args, **kwargs): + with tempfile.TemporaryDirectory() as tmpdir: + tmppath = pathlib.Path(tmpdir) + container_workpath = pathlib.PurePosixPath("/var/simdata/openstudio") + + cfg = deepcopy(self.cfg) + container_buildstock_dir = str(container_workpath / "buildstock") + cfg["buildstock_directory"] = container_buildstock_dir + cfg["project_directory"] = str(pathlib.Path(self.project_dir).relative_to(self.buildstock_dir)) + + with open(tmppath / "project_config.yml", "w") as f: + f.write(yaml.dump(cfg, Dumper=yaml.SafeDumper)) + container_cfg_path = str(container_workpath / "project_config.yml") + + with open(tmppath / "args.json", "w") as f: + json.dump([args, kwargs], f) + + credentials = boto3.Session().get_credentials().get_frozen_credentials() + env = { + "AWS_ACCESS_KEY_ID": credentials.access_key, + "AWS_SECRET_ACCESS_KEY": credentials.secret_key, + } + if credentials.token: + env["AWS_SESSION_TOKEN"] = credentials.token + env["POSTPROCESSING_INSIDE_DOCKER_CONTAINER"] = "true" + + logger.info("Starting container for postprocessing") + container = self.docker_client.containers.run( + self.image_url, + ["python3", "-m", "buildstockbatch.aws.aws", container_cfg_path], + volumes={ + tmpdir: {"bind": str(container_workpath), "mode": "rw"}, + self.buildstock_dir: {"bind": container_buildstock_dir, "mode": "ro"}, + }, + environment=env, + name="bsb_post", + auto_remove=True, + detach=True, + ) + for msg in container.logs(stream=True): + logger.debug(msg) + + def _process_results_inside_container(self): + with open("/var/simdata/openstudio/args.json", "r") as f: + args, kwargs = json.load(f) + + logger.info("Running postprocessing in container") + super().process_results(*args, **kwargs) @log_error_details() @@ -1751,20 +1441,37 @@ def main(): job_name = os.environ["JOB_NAME"] region = os.environ["REGION"] AwsBatch.run_job(job_id, s3_bucket, s3_prefix, job_name, region) + elif get_bool_env_var("POSTPROCESSING_INSIDE_DOCKER_CONTAINER"): + parser = argparse.ArgumentParser() + parser.add_argument("project_filename") + args = parser.parse_args() + batch = AwsBatch(args.project_filename) + batch._process_results_inside_container() else: parser = argparse.ArgumentParser() parser.add_argument("project_filename") - parser.add_argument( + group = parser.add_mutually_exclusive_group() + group.add_argument( "-c", "--clean", action="store_true", help="After the simulation is done, run with --clean to clean up AWS environment", ) - parser.add_argument( + group.add_argument( "--validateonly", help="Only validate the project YAML file and references. Nothing is executed", action="store_true", ) + group.add_argument( + "--postprocessonly", + help="Only do postprocessing, useful for when the simulations are already done", + action="store_true", + ) + group.add_argument( + "--crawl", + help="Only do the crawling in Athena. When simulations and postprocessing are done.", + action="store_true", + ) args = parser.parse_args() # validate the project, and in case of the --validateonly flag return True if validation passes @@ -1775,10 +1482,18 @@ def main(): batch = AwsBatch(args.project_filename) if args.clean: batch.clean() + elif args.postprocessonly: + batch.build_image() + batch.push_image() + batch.process_results() + elif args.crawl: + batch.process_results(skip_combine=True, use_dask_cluster=False) else: batch.build_image() batch.push_image() batch.run_batch() + batch.process_results() + batch.clean() if __name__ == "__main__": diff --git a/buildstockbatch/aws/awsbase.py b/buildstockbatch/aws/awsbase.py index fdac12e8..7ecbf097 100644 --- a/buildstockbatch/aws/awsbase.py +++ b/buildstockbatch/aws/awsbase.py @@ -1,8 +1,11 @@ import logging +from botocore.config import Config logger = logging.getLogger(__name__) +boto_client_config = Config(retries={"max_attempts": 5, "mode": "standard"}) + class AWSIAMHelper: logger.propagate = False @@ -13,7 +16,7 @@ def __init__(self, session): :param session: boto3 Session from 'parent' job base class """ self.session = session - self.iam = self.session.client("iam") + self.iam = self.session.client("iam", config=boto_client_config) def role_stitcher( self, @@ -141,9 +144,9 @@ def __init__(self, job_identifier, aws_config, boto3_session): self.session = boto3_session self.iam_helper = AWSIAMHelper(self.session) self.iam = self.iam_helper.iam - self.s3 = self.session.client("s3") + self.s3 = self.session.client("s3", config=boto_client_config) self.job_identifier = job_identifier - self.account = self.session.client("sts").get_caller_identity().get("Account") + self.account = self.session.client("sts", config=boto_client_config).get_caller_identity().get("Account") self.region = aws_config["region"] self.operator_email = aws_config["notifications_email"] @@ -155,29 +158,9 @@ def __init__(self, job_identifier, aws_config, boto3_session): self.s3_lambda_emr_config_key = f"{self.s3_bucket_prefix}/lambda_functions/emr_config.json" self.s3_emr_folder_name = "emr" - # EMR - emr_config = aws_config.get("emr", {}) - self.emr_manager_instance_type = emr_config.get("manager_instance_type", "m5.4xlarge") - self.emr_worker_instance_type = emr_config.get("worker_instance_type", "r5.4xlarge") - self.emr_worker_instance_count = emr_config.get("worker_instance_count", 4) - self.emr_cluster_security_group_name = f"{self.job_identifier}_emr_security_group" - self.emr_cluster_name = f"{self.job_identifier}_emr_dask_cluster" - self.emr_job_flow_role_name = f"{self.job_identifier}_emr_job_flow_role" - self.emr_job_flow_role_arn = "" - self.emr_service_role_name = f"{self.job_identifier}_emr_service_role" - self.emr_service_role_arn = "" - self.emr_cluster_security_group_id = "" - self.emr_log_uri = f"s3://{self.s3_bucket}/{self.s3_bucket_prefix}/emrlogs/" - self.emr_instance_profile_name = f"{self.job_identifier}_emr_instance_profile" - - # Lambda - self.lambda_emr_job_step_execution_role = f"{self.job_identifier}_emr_job_step_execution_role" - self.lambda_emr_job_step_function_name = f"{self.job_identifier}_emr_job_step_submission" - self.lambda_emr_job_step_execution_role_arn = "" - # Batch self.batch_compute_environment_name = f"computeenvionment_{self.job_identifier}" - self.batch_compute_environment_ami = "ami-0184013939261b626" + self.launch_template_name = f"launch_templ_{self.job_identifier}" self.batch_job_queue_name = f"job_queue_{self.job_identifier}" self.batch_service_role_name = f"batch_service_role_{self.job_identifier}" self.batch_instance_role_name = f"batch_instance_role_{self.job_identifier}" @@ -188,13 +171,6 @@ def __init__(self, job_identifier, aws_config, boto3_session): self.batch_use_spot = aws_config.get("use_spot", True) self.batch_spot_bid_percent = aws_config.get("spot_bid_percent", 100) - # Step Functions - self.state_machine_name = f"{self.job_identifier}_state_machine" - self.state_machine_role_name = f"{self.job_identifier}_state_machine_role" - - # SNS - self.sns_state_machine_topic = f"{self.job_identifier}_state_machine_notifications" - # VPC self.vpc_name = self.job_identifier self.vpc_id = "" # will be available after VPC creation @@ -202,13 +178,26 @@ def __init__(self, job_identifier, aws_config, boto3_session): self.priv_vpc_subnet_id_1 = "REPL" # will be available after VPC creation self.priv_vpc_subnet_id_2 = "REPL" # will be available after VPC creation + def get_tags(self, **kwargs): + tags = kwargs.copy() + tags.update(self.aws_config.get("tags", {})) + return tags + + def get_tags_uppercase(self, **kwargs): + tags = self.get_tags(**kwargs) + return [{"Key": k, "Value": v} for k, v in tags.items()] + + def get_tags_lowercase(self, _caps=True, **kwargs): + tags = self.get_tags(**kwargs) + return [{"key": k, "value": v} for k, v in tags.items()] + def __repr__(self): return f""" Job Identifier: {self.job_identifier} S3 Bucket for Source Data: {self.s3_bucket} S3 Prefix for Source Data: {self.s3_bucket_prefix} -A state machine {self.state_machine_name} will execute an AWS Batch job {self.job_identifier} against the source data. +This will execute an AWS Batch job {self.job_identifier} against the source data. Notifications of execution progress will be sent to {self.operator_email} once the email subscription is confirmed. Once processing is complete the state machine will then launch an EMR cluster with a job to combine the results and create an AWS Glue table. diff --git a/buildstockbatch/aws/s3_assets/bootstrap-dask-custom b/buildstockbatch/aws/s3_assets/bootstrap-dask-custom deleted file mode 100644 index ce6de174..00000000 --- a/buildstockbatch/aws/s3_assets/bootstrap-dask-custom +++ /dev/null @@ -1,106 +0,0 @@ -#!/bin/bash - -set -e - -# ----------------------------------------------------------------------------- -# 1. Check if running on the master node. If not, there's nothing do. -# ----------------------------------------------------------------------------- -grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \ -|| { echo "Not running on master node, nothing to do" && exit 0; } - - -# ----------------------------------------------------------------------------- -# 2. Install Miniconda -# ----------------------------------------------------------------------------- -echo "Installing Miniconda" -curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh -bash /tmp/miniconda.sh -b -p $HOME/miniconda -rm /tmp/miniconda.sh -echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc -source $HOME/.bashrc -conda update conda -y - - -# ----------------------------------------------------------------------------- -# 3. Install packages to use in packaged environment -# -# We install a few packages by default, and allow users to extend this list -# with a CLI flag: -# -# - dask-yarn >= 0.4.1, for deploying Dask on YARN. -# - pyarrow for working with hdfs, parquet, ORC, etc... -# - s3fs for access to s3 -# - nomkl to minimize environment size -# - conda-pack for packaging the environment for distribution -# ----------------------------------------------------------------------------- -echo "Installing base packages" -conda install \ --c conda-forge \ --y \ --q \ -python=3.7 \ -"dask>=2021.5" \ -"distributed>=2021.5" \ -"dask-yarn>=0.9.0" \ -"pandas>=1.0.0,!=1.0.4" \ -"pyarrow>=3.0.0" \ -"s3fs>=0.4.2,<0.5.0" \ -"numpy>=1.20.0" \ -conda-pack \ -tornado=5 - -aws s3 cp "$1" $HOME/postprocessing.tar.gz -pip install $HOME/postprocessing.tar.gz - -# ----------------------------------------------------------------------------- -# 4. Package the environment to be distributed to worker nodes -# ----------------------------------------------------------------------------- -echo "Packaging environment" -conda pack -q -o $HOME/environment.tar.gz - - -# ----------------------------------------------------------------------------- -# 5. List all packages in the worker environment -# ----------------------------------------------------------------------------- -echo "Packages installed in the worker environment:" -conda list - - -# ----------------------------------------------------------------------------- -# 6. Configure Dask -# -# This isn't necessary, but for this particular bootstrap script it will make a -# few things easier: -# -# - Configure the cluster's dashboard link to show the proxied version through -# jupyter-server-proxy. This allows access to the dashboard with only an ssh -# tunnel to the notebook. -# -# - Specify the pre-packaged python environment, so users don't have to -# -# - Set the default deploy-mode to local, so the dashboard proxying works -# -# - Specify the location of the native libhdfs library so pyarrow can find it -# on the workers and the client (if submitting applications). -# ------------------------------------------------------------------------------ -echo "Configuring Dask" -mkdir -p $HOME/.config/dask -cat <> $HOME/.config/dask/config.yaml -distributed: - dashboard: - link: "/proxy/{port}/status" - -yarn: - environment: /home/hadoop/environment.tar.gz - deploy-mode: local - - worker: - env: - ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/ - - client: - env: - ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/ -EOT -# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user -echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc diff --git a/buildstockbatch/aws/s3_assets/bsb_post.py b/buildstockbatch/aws/s3_assets/bsb_post.py deleted file mode 100644 index c1bade48..00000000 --- a/buildstockbatch/aws/s3_assets/bsb_post.py +++ /dev/null @@ -1,67 +0,0 @@ -import argparse -import boto3 -from dask_yarn import YarnCluster -from dask.distributed import Client -import json -from s3fs import S3FileSystem - -from postprocessing import ( - combine_results, - create_athena_tables, - remove_intermediate_files, -) - - -def do_postprocessing(s3_bucket, s3_bucket_prefix): - fs = S3FileSystem() - with fs.open(f"{s3_bucket}/{s3_bucket_prefix}/config.json", "r") as f: - cfg = json.load(f) - - ec2 = boto3.client("ec2") - - with open("/mnt/var/lib/info/job-flow.json", "r") as f: - job_flow_info = json.load(f) - - for instance_group in job_flow_info["instanceGroups"]: - if instance_group["instanceRole"].lower() == "core": - instance_type = instance_group["instanceType"] - instance_count = instance_group["requestedInstanceCount"] - - instance_info = ec2.describe_instance_types(InstanceTypes=[instance_type]) - - dask_worker_vcores = cfg["aws"].get("emr", {}).get("dask_worker_vcores", 2) - instance_memory = instance_info["InstanceTypes"][0]["MemoryInfo"]["SizeInMiB"] - instance_ncpus = instance_info["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"] - n_dask_workers = instance_count * instance_ncpus // dask_worker_vcores - worker_memory = round(instance_memory / instance_ncpus * dask_worker_vcores * 0.95) - - cluster = YarnCluster( - deploy_mode="local", - worker_vcores=dask_worker_vcores, - worker_memory="{} MiB".format(worker_memory), - n_workers=n_dask_workers, - ) - - client = Client(cluster) # noqa E841 - - results_s3_loc = f"{s3_bucket}/{s3_bucket_prefix}/results" - - combine_results(fs, results_s3_loc, cfg) - - aws_conf = cfg.get("postprocessing", {}).get("aws", {}) - if "athena" in aws_conf: - tbl_prefix = s3_bucket_prefix.split("/")[-1] - if not tbl_prefix: - tbl_prefix = cfg["aws"]["job_identifier"] - create_athena_tables(aws_conf, tbl_prefix, s3_bucket, f"{s3_bucket_prefix}/results/parquet") - - keep_individual_timeseries = cfg.get("postprocessing", {}).get("keep_individual_timeseries", False) - remove_intermediate_files(fs, results_s3_loc, keep_individual_timeseries) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("s3_bucket") - parser.add_argument("s3_bucket_prefix") - args = parser.parse_args() - do_postprocessing(args.s3_bucket, args.s3_bucket_prefix) diff --git a/buildstockbatch/aws/s3_assets/lambda_function.py b/buildstockbatch/aws/s3_assets/lambda_function.py deleted file mode 100644 index 8b742846..00000000 --- a/buildstockbatch/aws/s3_assets/lambda_function.py +++ /dev/null @@ -1,24 +0,0 @@ -import os -import io -import json -import boto3 -from pprint import pprint - - -def lambda_handler(event, context): - # some prep work needed for this - check your security groups - there may default groups if any EMR cluster - # was launched from the console - also prepare a bucket for logs - - # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html - - session = boto3.Session(region_name=os.environ["REGION"]) - - s3 = session.client("s3") - with io.BytesIO() as f: - s3.download_fileobj(os.environ["BUCKET"], os.environ["EMR_CONFIG_JSON_KEY"], f) - args = json.loads(f.getvalue()) - - emr = session.client("emr") - - response = emr.run_job_flow(**args) - pprint(response) diff --git a/buildstockbatch/aws/s3_assets/setup_postprocessing.py b/buildstockbatch/aws/s3_assets/setup_postprocessing.py deleted file mode 100644 index 6700198a..00000000 --- a/buildstockbatch/aws/s3_assets/setup_postprocessing.py +++ /dev/null @@ -1,16 +0,0 @@ -from setuptools import setup - -setup( - name="buildstockbatch-postprocessing", - version="0.1", - description="Just the stand alone postprocessing functions from Buildstock-Batch", - py_modules=["postprocessing"], - install_requires=[ - "dask[complete]>=2022.10.0", - "s3fs>=0.4.2,<0.5.0", - "boto3", - "pandas>=1.0.0,!=1.0.4", - "pyarrow>=3.0.0", - "numpy>=1.20.0", - ], -) diff --git a/buildstockbatch/base.py b/buildstockbatch/base.py index c231a0e9..57de30d6 100644 --- a/buildstockbatch/base.py +++ b/buildstockbatch/base.py @@ -897,30 +897,47 @@ def validate_openstudio_version(project_file): def get_dask_client(self): return Client() - def process_results(self, skip_combine=False, force_upload=False): - self.get_dask_client() # noqa: F841 - - if self.cfg["workflow_generator"]["type"] == "residential_hpxml": - if "simulation_output_report" in self.cfg["workflow_generator"]["args"].keys(): - if "timeseries_frequency" in self.cfg["workflow_generator"]["args"]["simulation_output_report"].keys(): - do_timeseries = ( - self.cfg["workflow_generator"]["args"]["simulation_output_report"]["timeseries_frequency"] - != "none" - ) - else: - do_timeseries = "timeseries_csv_export" in self.cfg["workflow_generator"]["args"].keys() + def cleanup_dask(self): + pass - fs = LocalFileSystem() - if not skip_combine: - postprocessing.combine_results(fs, self.results_dir, self.cfg, do_timeseries=do_timeseries) + def get_fs(self): + return LocalFileSystem() - aws_conf = self.cfg.get("postprocessing", {}).get("aws", {}) - if "s3" in aws_conf or force_upload: - s3_bucket, s3_prefix = postprocessing.upload_results( - aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path - ) - if "athena" in aws_conf: - postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix) + def upload_results(self, *args, **kwargs): + return postprocessing.upload_results(*args, **kwargs) + + def process_results(self, skip_combine=False, use_dask_cluster=True): + if use_dask_cluster: + self.get_dask_client() # noqa F841 + + try: + wfg_args = self.cfg["workflow_generator"].get("args", {}) + if self.cfg["workflow_generator"]["type"] == "residential_hpxml": + if "simulation_output_report" in wfg_args.keys(): + if "timeseries_frequency" in wfg_args["simulation_output_report"].keys(): + do_timeseries = wfg_args["simulation_output_report"]["timeseries_frequency"] != "none" + else: + do_timeseries = "timeseries_csv_export" in wfg_args.keys() + + fs = self.get_fs() + if not skip_combine: + postprocessing.combine_results(fs, self.results_dir, self.cfg, do_timeseries=do_timeseries) + + aws_conf = self.cfg.get("postprocessing", {}).get("aws", {}) + if "s3" in aws_conf or "aws" in self.cfg: + s3_bucket, s3_prefix = self.upload_results( + aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path + ) + if "athena" in aws_conf: + postprocessing.create_athena_tables( + aws_conf, + os.path.basename(self.output_dir), + s3_bucket, + s3_prefix, + ) + finally: + if use_dask_cluster: + self.cleanup_dask() keep_individual_timeseries = self.cfg.get("postprocessing", {}).get("keep_individual_timeseries", False) postprocessing.remove_intermediate_files(fs, self.results_dir, keep_individual_timeseries) diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py index b5bb7f4c..3176a23a 100644 --- a/buildstockbatch/cloud/docker_base.py +++ b/buildstockbatch/cloud/docker_base.py @@ -30,7 +30,7 @@ from buildstockbatch import postprocessing from buildstockbatch.base import BuildStockBatchBase -from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv +from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv, get_bool_env_var logger = logging.getLogger(__name__) @@ -57,6 +57,9 @@ class BatchInfo: def __init__(self, project_filename): super().__init__(project_filename) + if get_bool_env_var("POSTPROCESSING_INSIDE_DOCKER_CONTAINER"): + return + self.docker_client = docker.DockerClient.from_env() try: self.docker_client.ping() @@ -248,12 +251,7 @@ def _prep_jobs_for_batch(self, tmppath): n_sims = n_datapoints * (len(self.cfg.get("upgrades", [])) + 1) logger.debug("Total number of simulations = {}".format(n_sims)) - # This is the maximum number of jobs that can be in an array - if self.batch_array_size <= self.MAX_JOB_COUNT: - max_array_size = self.batch_array_size - else: - max_array_size = self.MAX_JOB_COUNT - n_sims_per_job = math.ceil(n_sims / max_array_size) + n_sims_per_job = math.ceil(n_sims / self.batch_array_size) n_sims_per_job = max(n_sims_per_job, 2) logger.debug("Number of simulations per array job = {}".format(n_sims_per_job)) @@ -396,8 +394,23 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path): with open(sim_dir / "os_stdout.log", "w") as f_out: try: logger.debug("Running {}".format(sim_id)) + cli_cmd = ["openstudio", "run", "-w", "in.osw"] + if cfg.get("baseline", dict()).get("custom_gems", False): + cli_cmd = [ + "openstudio", + "--bundle", + "/var/oscli/Gemfile", + "--bundle_path", + "/var/oscli/gems", + "--bundle_without", + "native_ext", + "run", + "-w", + "in.osw", + "--debug", + ] subprocess.run( - ["openstudio", "run", "-w", "in.osw"], + cli_cmd, check=True, stdout=f_out, stderr=subprocess.STDOUT, diff --git a/buildstockbatch/hpc.py b/buildstockbatch/hpc.py index 56876dcd..ade93702 100644 --- a/buildstockbatch/hpc.py +++ b/buildstockbatch/hpc.py @@ -41,6 +41,7 @@ path_rel_to_file, get_project_configuration, read_csv, + get_bool_env_var, ) from buildstockbatch import postprocessing from buildstockbatch.__version__ import __version__ as bsb_version @@ -49,10 +50,6 @@ logger = logging.getLogger(__name__) -def get_bool_env_var(varname): - return os.environ.get(varname, "0").lower() in ("true", "t", "1", "y", "yes") - - class SlurmBatch(BuildStockBatchBase): DEFAULT_SYS_IMAGE_DIR = None HPC_NAME = None @@ -959,7 +956,7 @@ def main(): assert not measures_only assert not sampling_only if upload_only: - batch.process_results(skip_combine=True, force_upload=True) + batch.process_results(skip_combine=True) else: batch.process_results() else: diff --git a/buildstockbatch/local.py b/buildstockbatch/local.py index 6efcb50d..1a47f36b 100644 --- a/buildstockbatch/local.py +++ b/buildstockbatch/local.py @@ -420,7 +420,7 @@ def main(): if args.measures_only or args.samplingonly: return if args.uploadonly: - batch.process_results(skip_combine=True, force_upload=True) + batch.process_results(skip_combine=True) else: batch.process_results() diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 403a482d..37ed938c 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -363,9 +363,9 @@ def get_upgrade_list(cfg): def write_metadata_files(fs, parquet_root_dir, partition_columns): - df = dd.read_parquet(parquet_root_dir) + df = dd.read_parquet(parquet_root_dir, filesystem=fs) sch = pa.Schema.from_pandas(df._meta_nonempty) - parquet.write_metadata(sch, f"{parquet_root_dir}/_common_metadata") + parquet.write_metadata(sch, f"{parquet_root_dir}/_common_metadata", filesystem=fs) logger.info(f"Written _common_metadata to {parquet_root_dir}") if partition_columns: @@ -554,16 +554,12 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): f"partitions which go into {ngroup} column group(s) of {partition_columns}" ) - if isinstance(fs, LocalFileSystem): - ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/" - else: - assert isinstance(fs, S3FileSystem) - ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/" + ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}" fs.makedirs(ts_out_loc) logger.info(f"Created directory {ts_out_loc} for writing. Now concatenating ...") - src_path = f"{ts_in_dir}/up{upgrade_id:02d}/" + src_path = f"{ts_in_dir}/up{upgrade_id:02d}" concat_partial = dask.delayed( partial( concat_and_normalize, @@ -575,7 +571,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): ) ) partition_vals_list = [ - list(partition_df.loc[bldg_id_list[0]].values) if partition_columns else [] + (list(partition_df.loc[bldg_id_list[0]].values) if partition_columns else []) for bldg_id_list in bldg_id_groups ] diff --git a/buildstockbatch/sampler/downselect.py b/buildstockbatch/sampler/downselect.py index 0820e366..64375ab7 100644 --- a/buildstockbatch/sampler/downselect.py +++ b/buildstockbatch/sampler/downselect.py @@ -7,6 +7,7 @@ :copyright: (c) 2020 by The Alliance for Sustainable Energy :license: BSD-3 """ + import gzip import logging import math diff --git a/buildstockbatch/sampler/residential_quota.py b/buildstockbatch/sampler/residential_quota.py index ce503bf0..73d9b185 100644 --- a/buildstockbatch/sampler/residential_quota.py +++ b/buildstockbatch/sampler/residential_quota.py @@ -7,6 +7,7 @@ :copyright: (c) 2020 by The Alliance for Sustainable Energy :license: BSD-3 """ + import docker import logging import os diff --git a/buildstockbatch/schemas/v0.3.yaml b/buildstockbatch/schemas/v0.3.yaml index 2a24a412..f0ab72e2 100644 --- a/buildstockbatch/schemas/v0.3.yaml +++ b/buildstockbatch/schemas/v0.3.yaml @@ -26,18 +26,20 @@ aws-spec: spot_bid_percent: num(min=1, max=100, required=False) batch_array_size: num(min=1, max=10000, required=True) notifications_email: regex('^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$', name='email', required=True) - emr: include('aws-emr-spec', required=False) + dask: include('aws-dask-spec', required=True) job_environment: include('aws-job-environment', required=False) + tags: map(str(), str(), required=False) aws-job-environment: vcpus: int(min=1, max=36, required=False) memory: int(min=1024, required=False) -aws-emr-spec: - manager_instance_type: str(required=False) - worker_instance_type: str(required=False) - worker_instance_count: int(min=1, required=False) - dask_worker_vcores: int(min=1, required=False) +aws-dask-spec: + scheduler_cpu: enum(1024, 2048, 4096, 8192, 16384, required=False) + scheduler_memory: int(min=1024, required=False) + worker_cpu: enum(1024, 2048, 4096, 8192, 16384, required=False) + worker_memory: int(min=1024, required=False) + n_workers: int(min=1, required=True) hpc-spec: account: str(required=True) diff --git a/buildstockbatch/test/test_aws.py b/buildstockbatch/test/test_aws.py new file mode 100644 index 00000000..3577f613 --- /dev/null +++ b/buildstockbatch/test/test_aws.py @@ -0,0 +1,85 @@ +import os +import yaml +import logging + +from buildstockbatch.aws.aws import AwsBatch + +here = os.path.dirname(os.path.abspath(__file__)) +logging.basicConfig(level="DEBUG") # Use DEBUG, INFO, or WARNING +logger = logging.getLogger(__name__) + + +def test_custom_gem_install(basic_residential_project_file): + project_filename, results_dir = basic_residential_project_file() + + # Add aws and custom_gems to the project file + with open(project_filename, "r") as f: + cfg = yaml.safe_load(f) + # custom_gems + cfg["baseline"]["custom_gems"] = True + # AWS + cfg["aws"] = {} + cfg["aws"]["job_identifier"] = "testaws" + cfg["aws"]["s3"] = {} + cfg["aws"]["s3"]["bucket"] = "resbldg-datasets" + cfg["aws"]["s3"]["prefix"] = "testing/external_demo_project" + cfg["aws"]["region"] = "us-west-2" + cfg["aws"]["use_spot"] = True + cfg["aws"]["batch_array_size"] = 100 + cfg["aws"]["notifications_email"] = "user@example.com" + with open(project_filename, "w") as f: + yaml.dump(cfg, f) + + buildstock_directory = cfg["buildstock_directory"] + + batch = AwsBatch(project_filename) + batch.build_image() + + gem_list_log_log_path = os.path.join( + buildstock_directory, + "resources", + ".aws_docker_image", + "openstudio_gem_list_output.log", + ) + assert os.path.exists(gem_list_log_log_path) + with open(gem_list_log_log_path, "r") as gem_list: + contents = gem_list.read() + custom_gem = "/var/oscli/gems/ruby/2.7.0/gems/openstudio-standards-0.2.0" + assert custom_gem in contents + + +def test_no_custom_gem_install(basic_residential_project_file): + project_filename, results_dir = basic_residential_project_file() + + # Add aws to the project file + with open(project_filename, "r") as f: + cfg = yaml.safe_load(f) + # AWS + cfg["aws"] = {} + cfg["aws"]["job_identifier"] = "testaws" + cfg["aws"]["s3"] = {} + cfg["aws"]["s3"]["bucket"] = "resbldg-datasets" + cfg["aws"]["s3"]["prefix"] = "testing/external_demo_project" + cfg["aws"]["region"] = "us-west-2" + cfg["aws"]["use_spot"] = True + cfg["aws"]["batch_array_size"] = 100 + cfg["aws"]["notifications_email"] = "user@example.com" + with open(project_filename, "w") as f: + yaml.dump(cfg, f) + + buildstock_directory = cfg["buildstock_directory"] + + batch = AwsBatch(project_filename) + batch.build_image() + + gem_list_log_log_path = os.path.join( + buildstock_directory, + "resources", + ".aws_docker_image", + "openstudio_gem_list_output.log", + ) + assert os.path.exists(gem_list_log_log_path) + with open(gem_list_log_log_path, "r") as gem_list: + contents = gem_list.read() + custom_gem = "/var/oscli/gems/ruby/2.7.0/gems/openstudio-standards-0.2.0" + assert custom_gem not in contents diff --git a/buildstockbatch/test/test_docker_base.py b/buildstockbatch/test/test_docker_base.py index a6829da4..88e70f1a 100644 --- a/buildstockbatch/test/test_docker_base.py +++ b/buildstockbatch/test/test_docker_base.py @@ -1,4 +1,5 @@ """Tests for the DockerBatchBase class.""" + from fsspec.implementations.local import LocalFileSystem import gzip import json diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/Gemfile b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/Gemfile index 0beaaa52..4e30acd4 100644 --- a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/Gemfile +++ b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/Gemfile @@ -14,7 +14,7 @@ ruby "~> 2.7.0" gem 'openstudio-extension', '= 0.5.1' gem 'openstudio-workflow', '= 2.3.1' -gem 'openstudio-standards', '= 0.2.16' +gem 'openstudio-standards', '= 0.2.0' # Deliberately obsolete version to ensure custom gems works as expected # gem 'openstudio-standards', git: 'https://github.com/NREL/openstudio-standards.git', ref: '971514ee0a64262a9c81788fd85fc60d8dd69980' group :native_ext do diff --git a/buildstockbatch/test/test_validation.py b/buildstockbatch/test/test_validation.py index afad8d19..ee915940 100644 --- a/buildstockbatch/test/test_validation.py +++ b/buildstockbatch/test/test_validation.py @@ -18,6 +18,7 @@ import json import pathlib from buildstockbatch.hpc import EagleBatch, SlurmBatch, KestrelBatch +from buildstockbatch.aws.aws import AwsBatch from buildstockbatch.local import LocalBatch from buildstockbatch.base import BuildStockBatchBase, ValidationError from buildstockbatch.test.shared_testing_stuff import ( @@ -60,6 +61,11 @@ def test_local_docker_validation_is_classmethod(): assert inspect.ismethod(LocalBatch.validate_project) +def test_aws_batch_validation_is_static(): + assert isinstance(AwsBatch.validate_project, types.FunctionType) + assert isinstance(AwsBatch.validate_dask_settings, types.FunctionType) + + def test_complete_schema_passes_validation(): assert BuildStockBatchBase.validate_project_schema(os.path.join(example_yml_dir, "complete-schema.yml")) @@ -348,6 +354,38 @@ def test_validate_resstock_or_comstock_version(mocker): BuildStockBatchBase.validate_resstock_or_comstock_version(str(proj_filename)) +def test_dask_config(): + orig_filename = os.path.join(example_yml_dir, "minimal-schema.yml") + cfg = get_project_configuration(orig_filename) + with tempfile.TemporaryDirectory() as tmpdir: + cfg["aws"] = { + "dask": { + "scheduler_cpu": 1024, + "scheduler_memory": 2048, + "worker_cpu": 1024, + "worker_memory": 2048, + "n_workers": 1, + } + } + test1_filename = os.path.join(tmpdir, "test1.yml") + with open(test1_filename, "w") as f: + json.dump(cfg, f) + AwsBatch.validate_dask_settings(test1_filename) + cfg["aws"]["dask"]["scheduler_memory"] = 9 * 1024 + test2_filename = os.path.join(tmpdir, "test2.yml") + with open(test2_filename, "w") as f: + json.dump(cfg, f) + with pytest.raises(ValidationError, match=r"between 2048 and 8192"): + AwsBatch.validate_dask_settings(test2_filename) + cfg["aws"]["dask"]["scheduler_memory"] = 8 * 1024 + cfg["aws"]["dask"]["worker_memory"] = 1025 + test3_filename = os.path.join(tmpdir, "test3.yml") + with open(test3_filename, "w") as f: + json.dump(cfg, f) + with pytest.raises(ValidationError, match=r"needs to be a multiple of 1024"): + AwsBatch.validate_dask_settings(test3_filename) + + def test_validate_eagle_output_directory(): minimal_yml = pathlib.Path(example_yml_dir, "minimal-schema.yml") with pytest.raises(ValidationError, match=r"must be in /scratch or /projects"): diff --git a/buildstockbatch/utils.py b/buildstockbatch/utils.py index ea4b503c..e9453b38 100644 --- a/buildstockbatch/utils.py +++ b/buildstockbatch/utils.py @@ -139,3 +139,7 @@ def compress_file(in_filename, out_filename): def calc_hash_for_file(filename): with open(filename, "rb") as f: return hashlib.sha256(f.read()).hexdigest() + + +def get_bool_env_var(varname): + return os.environ.get(varname, "0").lower() in ("true", "t", "1", "y", "yes") diff --git a/buildstockbatch/workflow_generator/commercial.py b/buildstockbatch/workflow_generator/commercial.py index 6495acfe..b0b1c4cd 100644 --- a/buildstockbatch/workflow_generator/commercial.py +++ b/buildstockbatch/workflow_generator/commercial.py @@ -120,16 +120,16 @@ def create_osw(self, sim_id, building_id, upgrade_idx): if "lifetime" in option: apply_upgrade_measure["arguments"]["option_{}_lifetime".format(opt_num)] = option["lifetime"] if "apply_logic" in option: - apply_upgrade_measure["arguments"][ - "option_{}_apply_logic".format(opt_num) - ] = self.make_apply_logic_arg(option["apply_logic"]) + apply_upgrade_measure["arguments"]["option_{}_apply_logic".format(opt_num)] = ( + self.make_apply_logic_arg(option["apply_logic"]) + ) for cost_num, cost in enumerate(option.get("costs", []), 1): for arg in ("value", "multiplier"): if arg not in cost: continue - apply_upgrade_measure["arguments"][ - "option_{}_cost_{}_{}".format(opt_num, cost_num, arg) - ] = cost[arg] + apply_upgrade_measure["arguments"]["option_{}_cost_{}_{}".format(opt_num, cost_num, arg)] = ( + cost[arg] + ) if "package_apply_logic" in measure_d: apply_upgrade_measure["arguments"]["package_apply_logic"] = self.make_apply_logic_arg( measure_d["package_apply_logic"] diff --git a/buildstockbatch/workflow_generator/residential_hpxml.py b/buildstockbatch/workflow_generator/residential_hpxml.py index 71cab179..f4442bd2 100644 --- a/buildstockbatch/workflow_generator/residential_hpxml.py +++ b/buildstockbatch/workflow_generator/residential_hpxml.py @@ -523,16 +523,16 @@ def create_osw(self, sim_id, building_id, upgrade_idx): if "lifetime" in option: apply_upgrade_measure["arguments"]["option_{}_lifetime".format(opt_num)] = option["lifetime"] if "apply_logic" in option: - apply_upgrade_measure["arguments"][ - "option_{}_apply_logic".format(opt_num) - ] = self.make_apply_logic_arg(option["apply_logic"]) + apply_upgrade_measure["arguments"]["option_{}_apply_logic".format(opt_num)] = ( + self.make_apply_logic_arg(option["apply_logic"]) + ) for cost_num, cost in enumerate(option.get("costs", []), 1): for arg in ("value", "multiplier"): if arg not in cost: continue - apply_upgrade_measure["arguments"][ - "option_{}_cost_{}_{}".format(opt_num, cost_num, arg) - ] = cost[arg] + apply_upgrade_measure["arguments"]["option_{}_cost_{}_{}".format(opt_num, cost_num, arg)] = ( + cost[arg] + ) if "package_apply_logic" in measure_d: apply_upgrade_measure["arguments"]["package_apply_logic"] = self.make_apply_logic_arg( measure_d["package_apply_logic"] diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index bd619d14..d13ed5cd 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -35,3 +35,9 @@ Development Changelog :pullreq: 426 A bugfix for gracefully handling empty data_point_out.json files. + + .. change:: + :tags: aws, feature + :pullreq: 345 + + Major update to get AWS Batch run environment working. diff --git a/docs/installation.rst b/docs/installation.rst index 50cf380a..a6d19b52 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -65,6 +65,8 @@ For Windows, the process is similar. .. _OpenStudio release: https://github.com/NREL/OpenStudio/releases +.. _bsb-python: + BuildStockBatch Python Library .............................. @@ -231,10 +233,46 @@ Amazon Web Services (Beta) .. warning:: - The AWS version of buildstockbatch is currently broken. A remedy is in - progress. Thanks for your patience. + The AWS version of buildstockbatch is in active development. Use at your own + risk. It's provided as-is with no promise of support. + +Docker +...... + +Install either `Docker Desktop `_ of +`Docker Engine `_ for your platform. + +BuildStockBatch Python Library +.............................. + +Instal the buildstockbatch python library as described in :ref:`bsb-python` for +the local installation. You'll need to install with the ``aws`` extra as follows. + +For a standard installation + +:: + + cd /path/to/buildstockbatch + python -m pip install -e ".[aws]" + +For developer installation + +:: + + cd /path/to/buildstockbatch + python -m pip install -e ".[dev,aws]" + pre-commit install + +AWS User Configuration +...................... + +Follow the instructions for :ref:`aws-user-config-local` on the local install. +Your AWS user account or role needs to have pretty expansive permissions to +create IAM roles, VPCs, compute resources, etc. + +.. todo:: + + Define permission set needed. -The installation instructions are the same as the :ref:`local-install` -installation. You will need to use an AWS account with appropriate permissions. -The first time you run ``buildstock_aws`` it may take several minutes, -especially over a slower internet connection as it is downloading and building a docker image. + For NREL users, the typical ``resbldg-user`` or ``developers`` role in the + nrel-aws-resbldg account is probably insufficient. diff --git a/docs/project_defn.rst b/docs/project_defn.rst index c592c1b8..b85c8413 100644 --- a/docs/project_defn.rst +++ b/docs/project_defn.rst @@ -218,42 +218,59 @@ on the `AWS Batch `_ service. on AWS. In a future version we will break backwards compatibility in the config file and have more consistent options. -* ``job_identifier``: A unique string that starts with an alphabetical character, +* ``job_identifier``: (required) A unique string that starts with an alphabetical character, is up to 10 characters long, and only has letters, numbers or underscore. This is used to name all the AWS service objects to be created and differentiate it from other jobs. -* ``s3``: Configuration for project data storage on s3. When running on AWS, +* ``s3``: (required) Configuration for project data storage on s3. When running on AWS, this overrides the s3 configuration in the :ref:`post-config-opts`. * ``bucket``: The s3 bucket this project will use for simulation output and processed data storage. * ``prefix``: The s3 prefix at which the data will be stored. -* ``region``: The AWS region in which the batch will be run and data stored. -* ``use_spot``: true or false. Defaults to false if missing. This tells the project +* ``region``: (required) The AWS region in which the batch will be run and data stored. Probably "us-west-2" if you're at NREL. +* ``use_spot``: (optional) true or false. Defaults to true if missing. This tells the project to use the `Spot Market `_ for data simulations, which typically yields about 60-70% cost savings. -* ``spot_bid_percent``: Percent of on-demand price you're willing to pay for +* ``spot_bid_percent``: (optional) Percent of on-demand price you're willing to pay for your simulations. The batch will wait to run until the price drops below this - level. -* ``batch_array_size``: Number of concurrent simulations to run. Max: 10000. -* ``notifications_email``: Email to notify you of simulation completion. + level. Usually leave this one blank. +* ``batch_array_size``: (required) Number of concurrent simulations to run. Max: 10,000. + Unless this is a small run with fewer than 100,000 simulations, just set this + to 10,000. +* ``notifications_email``: (required) Email to notify you of simulation completion. You'll receive an email at the beginning where you'll need to accept the - subscription to receive further notification emails. -* ``emr``: Optional key to specify options for postprocessing using an EMR cluster. Generally the defaults should work fine. - - * ``manager_instance_type``: The `instance type`_ to use for the EMR master node. Default: ``m5.xlarge``. - * ``worker_instance_type``: The `instance type`_ to use for the EMR worker nodes. Default: ``r5.4xlarge``. - * ``worker_instance_count``: The number of worker nodes to use. Same as ``eagle.postprocessing.n_workers``. - Increase this for a large dataset. Default: 2. - * ``dask_worker_vcores``: The number of cores for each dask worker. Increase this if your dask workers are running out of memory. Default: 2. + subscription to receive further notification emails. This doesn't work right now. +* ``dask``: (required) Dask configuration for postprocessing + + * ``n_workers``: (required) Number of dask workers to use. + * ``scheduler_cpu``: (optional) One of ``[1024, 2048, 4096, 8192, 16384]``. + Default: 2048. CPU to allocate for the scheduler task. 1024 = 1 VCPU. See + `Fargate Task CPU and memory`_ for allowable combinations of CPU and + memory. + * ``scheduler_memory``: (optional) Amount of memory to allocate to the + scheduler task. Default: 8192. See `Fargate Task CPU and memory`_ for + allowable combinations of CPU and memory. + * ``worker_cpu``: (optional) One of ``[1024, 2048, 4096, 8192, 16384]``. + Default: 2048. CPU to allocate for the worker tasks. 1024 = 1 VCPU. See + `Fargate Task CPU and memory`_ for allowable combinations of CPU and + memory. + * ``worker_memory``: (optional) Amount of memory to allocate to the worker + tasks. Default: 8192. See `Fargate Task CPU and memory`_ for allowable + combinations of CPU and memory. * ``job_environment``: Specifies the computing requirements for each simulation. - * ``vcpus``: Number of CPUs needed. default: 1. - * ``memory``: Amount of RAM memory needed for each simulation in MiB. default 1024. For large multifamily buildings + * ``vcpus``: (optional) Number of CPUs needed. Default: 1. This probably doesn't need to be changed. + * ``memory``: (optional) Amount of RAM memory needed for each simulation in MiB. default 1024. For large multifamily buildings this works better if set to 2048. +* ``tags``: (optional) This is a list of key-value pairs to attach as tags to + all the AWS objects created in the process of running the simulation. If you + are at NREL, please fill out the following tags so we can track and allocate + costs: ``billingId``, ``org``, and ``owner``. .. _instance type: https://aws.amazon.com/ec2/instance-types/ +.. _Fargate Task CPU and memory: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/fargate-tasks-services.html#fargate-tasks-size .. _postprocessing: @@ -288,7 +305,8 @@ Athena. This process requires appropriate access to an AWS account to be configured on your machine. You will need to set this up wherever you use buildstockbatch. If you don't have keys, consult your AWS administrator to get them set up. The appropriate keys are already installed on Eagle and Kestrel, so -no action is required. +no action is required. If you run on AWS, this step is already done since the +simulation outputs are already on S3. * :ref:`Local AWS setup instructions ` * `Detailed instructions from AWS `_ diff --git a/docs/run_sims.rst b/docs/run_sims.rst index 6d99ca51..91d5aafb 100644 --- a/docs/run_sims.rst +++ b/docs/run_sims.rst @@ -84,6 +84,11 @@ tool. .. command-output:: buildstock_aws --help :ellipsis: 0,8 +The first time you run it may take several minutes to build and upload the +docker image. ``buildstock_aws`` needs to stay running and connected to the +internet while the batch simulation is running on AWS. We have found it useful +to run from an EC2 instance for convenience, but that is not strictly necessary. + AWS Specific Project configuration .................................. @@ -93,7 +98,7 @@ file, something like this: .. code-block:: yaml aws: - # The job_identifier should be unique, start with alpha, and limited to 10 chars or data loss can occur + # The job_identifier should be unique, start with alpha, and limited to 10 chars job_identifier: national01 s3: bucket: myorg-resstock @@ -101,15 +106,22 @@ file, something like this: region: us-west-2 use_spot: true batch_array_size: 10000 - # To receive email updates on job progress accept the request to receive emails that will be sent from Amazon - notifications_email: your_email@somewhere.com + dask: + n_workers: 8 + notifications_email: your_email@somewhere.com # doesn't work right now See :ref:`aws-config` for details. Cleaning up after yourself .......................... -When the simulation and postprocessing is all complete, run ``buildstock_aws ---clean your_project_file.yml``. This will clean up all the AWS resources that -were created on your behalf to run the simulations. Your results will still be -on S3 and queryable in Athena. +When the batch is done, ``buildstock_aws`` should clean up after itself. +However, if something goes wrong, the cleanup script can be run with the +``--clean`` option like so: + +:: + + buildstock_aws --clean your_project_file.yml + +This will clean up all the AWS resources that were created on your behalf to run +the simulations. Your results will still be on S3 and queryable in Athena. diff --git a/setup.py b/setup.py index 51233eb2..cd48c8a6 100644 --- a/setup.py +++ b/setup.py @@ -39,9 +39,9 @@ "fsspec", "yamale", "ruamel.yaml", - "awsretry", "lxml", "semver", + "tqdm", ], extras_require={ "dev": [ @@ -56,11 +56,14 @@ "sphinx_paramlinks", "changelog", "flake8", - "black", + "black~=24.0", "rope", "doc8", "pre-commit", - ] + ], + "aws": [ + "dask-cloudprovider[aws]", + ], }, entry_points={ "console_scripts": [