Skip to content

Commit

Permalink
Added wgs_qc_wf_tools for launching WGS QC workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
ngaddis committed Sep 17, 2024
1 parent b5972c9 commit 1e4fbb0
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 0 deletions.
31 changes: 31 additions & 0 deletions wgs_qc_wf_tools/v1.0/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use an official Python runtime as the base image
FROM python:3.12.6-slim

# Add Container Labels
LABEL maintainer="Nathan Gaddis <ngaddis@rti.org>"
LABEL description="Tools for launching WGS QC workflow"

# Install System Dependencies
RUN apt-get update && apt-get install -y \
less \
&& rm -rf /var/lib/apt/lists/*

# Set the working directory in the container
WORKDIR /opt/

# Install the required dependencies
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

# Add scripts
ADD entrypoint.sh /opt/
ADD create_step_2_config.py /opt/
ADD start_run.py /opt/

# Set permissions
RUN chmod 755 /opt/entrypoint.sh
RUN chmod 755 /opt/create_step_2_config.py
RUN chmod 755 /opt/start_run.py

# Set the entry point command
ENTRYPOINT [ "/opt/entrypoint.sh" ]
75 changes: 75 additions & 0 deletions wgs_qc_wf_tools/v1.0/create_step_2_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import argparse
import json
import boto3
import re
import os
import sys

# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument(
'--aws_access_key_id',
help='AWS access key ID for profile used to run workflow',
type = str,
required = True
)
parser.add_argument(
'--aws_secret_access_key',
help='AWS secret access key ID for profile used to run workflow',
type = str,
required = True
)
parser.add_argument(
'--step_1_output_json',
help = 'S3 path to JSON file containing outputs from Step 1',
type = str,
required = True
)
parser.add_argument(
'--output_dir',
help = 'Directory for outputting Step 1 outputs and Step 2 config JSON',
type = str,
required = True
)
parser.add_argument(
'--minimum_ancestry_sample_count',
help = 'Minimum ancestry sample count for running Step 2',
type = int,
default = 50,
required = False
)

args = parser.parse_args()

# Create output directory if doesn't exist
output_dir = args.output_dir if (args.output_dir[-1] == "/") else (args.output_dir + "/")
os.system("mkdir -p {}".format(output_dir))

# Retrieve Step 1 outputs json from S3
result = re.search('s3://(.+?)/(.+)', args.step_1_output_json)
if result:
step_1_output_bucket = result.group(1)
step_1_output_json = result.group(2)
local_step_1_output_json = '{}step_1_outputs.json'.format(output_dir)
session = boto3.Session(aws_access_key_id=args.aws_access_key_id, aws_secret_access_key=args.aws_secret_access_key)
s3 = session.resource('s3')
my_bucket = s3.Bucket(step_1_output_bucket)
my_bucket.download_file(step_1_output_json, local_step_1_output_json)
else:
print("Invalid path provided for step_1_output_json")
sys.exit(1)

# Read Step 1 outputs
with open(local_step_1_output_json) as f:
step_1_outputs = json.load(f)

# Generate step 2 config for each ancestry with at least the minimum required sample count
for ancestry_index in range(len(step_1_outputs['wgs_qc_wf_step_1.counts']['sample_ancestries'])):
if step_1_outputs['wgs_qc_wf_step_1.counts']['sample_ancestries'][ancestry_index]['right'] > args.minimum_ancestry_sample_count:
step_2_config = step_1_outputs['wgs_qc_wf_step_1.step_2_parameters'].copy()
step_2_config['ancestry_samples'] = step_1_outputs['wgs_qc_wf_step_1.sample_lists']['ancestries'][ancestry_index]['right']
step_2_config['output_basename'] = '{}_{}'.format(step_2_config['output_basename'], step_1_outputs['wgs_qc_wf_step_1.sample_lists']['ancestries'][ancestry_index]['left'])
step_2_config_file = '{}wgs_qc_wf_step_2_config_{}.json'.format(output_dir, step_1_outputs['wgs_qc_wf_step_1.sample_lists']['ancestries'][ancestry_index]['left'])
with open(step_2_config_file, 'w') as f:
json.dump(step_2_config, f)

95 changes: 95 additions & 0 deletions wgs_qc_wf_tools/v1.0/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/bin/bash
args_array=("$@")

# Check for required parameters
if [ -z "$task" ]; then
echo "task not provided, exiting!"
exit
fi
if [ -z "$aws_access_key_id" ]; then
echo "aws_access_key_id not provided, exiting!"
exit
fi
if [ -z "$aws_secret_access_key" ]; then
echo "aws_secret_access_key not provided, exiting!"
exit
fi
# Assign default values if parameters not provided
if [ -z "$role_arn" ]; then
role_arn="arn:aws:iam::515876044319:role/service-role/OmicsWorkflow-20240601210363"
fi
if [ -z "$output_uri" ]; then
output_uri="s3://rti-nida-iomics-oa-healthomics-output/"
fi
if [ -z "$storage_capacity" ]; then
storage_capacity=1000
fi

if ["$task" == "launch_step_1"]; then

# Check parameters and set to default if not provided where applicable
if [ -z "$parameters" ]; then
echo "parameters not provided, exiting!"
exit
fi
if [ -z "$name" ]; then
echo "name not provided, exiting!"
exit
fi
if [ -z "$workflow_id" ]; then
workflow_id="3796246"
fi

# Launch Step 1
python3 /opt/start_run.py \
--aws_access_key_id $aws_access_key_id \
--aws_secret_access_key $aws_secret_access_key \
--workflowId $workflow_id \
--parameters $parameters \
--name $name \
--roleArn $role_arn \
--outputUri $output_uri \
--storageCapacity $storage_capacity
fi

if ["$task" == "launch_step_2"]; then

if [ -z "$step_1_output_json" ]; then
echo "step_1_output_json not provided, exiting!"
exit
fi
if [ -z "$step_2_config_output_dir" ]; then
echo "step_2_config_output_dir not provided, exiting!"
exit
fi
if [ -z "$minimum_ancestry_sample_count" ]; then
minimum_ancestry_sample_count=50
fi
if [ -z "$workflow_id" ]; then
workflow_id="3796246"
fi

# Create Step 2 config files
python3 /opt/create_step_2_config.py \
--aws_access_key_id $aws_access_key_id \
--aws_secret_access_key $aws_secret_access_key \
--step_1_output_json $step_1_output_json \
--output_dir $step_2_config_output_dir \
--minimum_ancestry_sample_count $minimum_ancestry_sample_count

# Launch Step 2
step_2_config_output_dir=$(echo $step_2_config_output_dir | perl -ne 'chomp; if (substr($_, -1) eq "/") { print $_; } else { print $_."/"; }')
for step_2_config in $(ls ${step_2_config_output_dir}*step_2_config*.json); do
name=$(perl -ne 'if (/\"output_basename\"\: \"(.+?)\"/) { print $1; }' $step_2_config)
python3 /opt/start_run.py \
--aws_access_key_id $aws_access_key_id \
--aws_secret_access_key $aws_secret_access_key \
--workflowId $workflow_id \
--parameters $step_2_config \
--name $name \
--roleArn $role_arn \
--outputUri $output_uri \
--storageCapacity $storage_capacity
done

fi
1 change: 1 addition & 0 deletions wgs_qc_wf_tools/v1.0/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3==1.35.20
121 changes: 121 additions & 0 deletions wgs_qc_wf_tools/v1.0/start_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import argparse
import boto3
from datetime import datetime
import json

# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument(
'--aws_access_key_id',
help='AWS access key ID for profile used to run workflow',
type = str,
required = True
)
parser.add_argument(
'--aws_secret_access_key',
help='AWS secret access key ID for profile used to run workflow',
type = str,
required = True
)
parser.add_argument(
'--workflowId',
help='Healthomics ID of workflow',
type = str,
required = True
)
parser.add_argument(
'--parameters',
help='JSON file with run parameters',
type = str,
required = True
)
parser.add_argument(
'--name',
help='A name for the run',
type = str,
required = True
)
parser.add_argument(
'--roleArn',
help='Service role for the run',
type = str,
required = True
)
parser.add_argument(
'--outputUri',
help='S3 path for run outputs',
type = str,
required = True
)
parser.add_argument(
'--workflowType',
help='Workflow type for run',
type = str,
default = "PRIVATE",
required = False,
choices = ['PRIVATE', 'PUBLIC']
)
parser.add_argument(
'--priority',
help='Priority for the run',
type = int,
default = 100,
required = False
)
parser.add_argument(
'--storageType',
help='Storage type for the run',
type = str,
default = "STATIC",
required = False,
choices = ['STATIC', 'DYNAMIC']
)
parser.add_argument(
'--storageCapacity',
help='Storage capacity for run in gigabytes',
type = int,
default = 1000,
required = False
)
parser.add_argument(
'--logLevel',
help='Log level for the run',
type = str,
default = "ALL",
required = False,
choices = ['OFF', 'FATAL', 'ERROR', 'ALL']
)
parser.add_argument(
'--retentionMode',
help='Retention mode for the run',
type = str,
default = "RETAIN",
required = False,
choices = ['RETAIN', 'REMOVE']
)
args = parser.parse_args()

# Open AWS Healthomics session
session = boto3.Session(aws_access_key_id=args.aws_access_key_id, aws_secret_access_key=args.aws_secret_access_key)
omics = session.client('omics')

# Read wf arguments
with open(args.parameters) as f:
parameters = json.load(f)

request_id = "{}_{}".format(args.name, str(datetime.now().timestamp()))
response = omics.start_run(
workflowId=args.workflowId,
workflowType=args.workflowType,
roleArn=args.roleArn,
name=args.name,
priority=args.priority,
parameters=parameters,
storageType=args.storageType,
storageCapacity=args.storageCapacity,
outputUri=args.outputUri,
logLevel=args.logLevel,
requestId=request_id,
retentionMode=args.retentionMode
)

0 comments on commit 1e4fbb0

Please sign in to comment.