https://github.com/jvivian/one_off_scripts
Tip revision: 3ad04be99cd01e6a047c1b530cc8a1de82bd862e authored by John Vivian on 02 February 2017, 01:31:18 UTC
Refactor SRA pipeline to use faster method than fastq-dump
Refactor SRA pipeline to use faster method than fastq-dump
Tip revision: 3ad04be
toil_recompute.py
#!/usr/bin/env python2.7
"""
Author: John Vivian
Date: 1-9-16
Designed for doing scaling tests on the rna-seq cgl pipeline.
- Creates configuration file with a number of samples that meet the size quota
- Create launch script with UUID for this run
-
"""
import argparse
import os
import random
import subprocess
import boto
import boto.exception
import logging
import boto.ec2.cloudwatch
import time
from uuid import uuid4
from boto_lib import get_instance_ids, get_instance_ips, get_avail_zone
from calculate_ec2_spot_instance import calculate_cost
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
def fix_launch(params):
"""
Fixes the bash script that launches the pipeline to have a unique s3_dir and aws jobstore bucket name
params: argparse.Namespace Input arguments
"""
logging.info('Fixing launch script aws bucket name')
uuid = uuid4()
with open(os.path.join(params.shared_dir, 'launch.sh'), 'r') as f_in:
with open(os.path.join(params.shared_dir, 'fixed.sh'), 'w') as f_out:
for line in f_in:
if line.startswith('aws'):
f_out.write('aws:us-west-2:{}-{} \\\n'.format(uuid, str(datetime.utcnow()).split()[0]))
elif line.startswith('--s3_dir'):
f_out.write('--s3_dir toil-recompute \\\n')
else:
f_out.write(line)
os.remove(os.path.join(params.shared_dir, 'launch.sh'))
os.rename(os.path.join(params.shared_dir, 'fixed.sh'), os.path.join(params.shared_dir, 'launch.sh'))
logging.info('Fixing execution privileges of bash script.')
st = os.stat(os.path.join(params.shared_dir, 'launch.sh'))
os.chmod(os.path.join(params.shared_dir, 'launch.sh'), st.st_mode | 0111)
return uuid
def launch_cluster(params):
"""
Launches a toil cluster of size N, with shared dir S, of instance type I, at a spot brid of B
params: argparse.Namespace Input arguments
"""
logging.info('Launching cluster of size: {} and type: {}'.format(params.cluster_size, params.instance_type))
subprocess.check_call(['cgcloud',
'create-cluster',
'--leader-instance-type', 'm3.medium',
'--instance-type', params.instance_type,
'--share', params.shared_dir,
'--num-workers', str(params.cluster_size),
'-c', params.cluster_name,
'--spot-bid', str(params.spot_price),
'--leader-on-demand',
'--ssh-opts',
'-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no',
'toil'])
def launch_pipeline(params):
"""
Launches pipeline on toil-leader in a screen named the cluster run name
params: argparse.Namespace Input arguments
"""
leader_ip = get_instance_ips(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-leader')[0]
logging.info('Launching Pipeline and blocking. Check log.txt on leader for stderr and stdout')
try:
subprocess.check_call('ssh -o StrictHostKeyChecking=no mesosbox@{} '
'/home/mesosbox/shared/launch.sh ">&" log.txt'.format(leader_ip),
shell=True)
except subprocess.CalledProcessError as e:
logging.info('Pipeline exited prematurely: {}'.format(e))
def collect_metrics(params, start, uuid=str(uuid4())):
"""
Collect metrics from AWS instances. AWS limits data collection to 1,440 points or 5 days if
collected in intervals of 5 minutes. This metric collection will "page" the results in intervals
of 4 days (to be safe) in order to collect all the desired metrics.
instance_ids: list List of instance IDs
list_of_metrics: list List of metric names
start: float time.time() of start point
stop: float time.time() of stop point
region: str AWS region metrics are being collected from
uuid: str UUID of metric collection
"""
list_of_metrics = ['AWS/EC2/CPUUtilization',
'CGCloud/MemUsage',
'CGCloud/DiskUsage_mnt_ephemeral',
'CGCloud/DiskUsage_root',
'AWS/EC2/NetworkIn',
'AWS/EC2/NetworkOut',
'AWS/EC2/DiskWriteOps',
'AWS/EC2/DiskReadOps']
ids = get_instance_ids(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-worker')
while ids:
# metrics = {metric: [] for metric in list_of_metrics}
for instance_id in ids:
for metric in list_of_metrics:
averages = []
try:
s = start
while s < stop:
e = s + (4 * 24 * 3600)
aws_start = datetime.utcfromtimestamp(s)
aws_stop = datetime.utcfromtimestamp(e)
met_object = get_metric(metric, instance_id, aws_start, aws_stop)
averages.extend([x['Average'] for x in get_datapoints(met_object)])
s = e
if averages:
metrics[metric].append(averages)
logging.info('# of Datapoints for metric {} is {}'.format(metric, len(metrics[metric][0])))
except RuntimeError:
if instance_id in instance_ids:
instance_ids.remove(instance_id)
# Remove metrics if no datapoints were collected
metrics = dict((k, v) for k, v in metrics.iteritems() if v)
# Save CSV of data
mkdir_p('{}_{}'.format(uuid, str(datetime.utcnow()).split()[0]))
for metric in metrics:
with open('{}_{}/{}.csv'.format(uuid, str(datetime.utcnow()).split()[0], metric.rsplit('/', 1)[1]), 'wb') as f:
writer = csv.writer(f)
writer.writerows(metrics[metric])
def main():
"""
Automation script for running scaling tests for Toil Recompute
"""
parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--config', required=True, help='Configuration file for run. Must be in shared_dir')
parser.add_argument('-c', '--cluster_size', required=True, help='Number of workers desired in the cluster.')
parser.add_argument('-s', '--sample_size', required=True, type=float, help='Size of the sample deisred in TB.')
parser.add_argument('-t', '--instance_type', default='c3.8xlarge', help='e.g. m4.large or c3.8xlarge.')
parser.add_argument('-n', '--cluster_name', required=True, help='Name of cluster.')
parser.add_argument('--namespace', default='jtvivian', help='CGCloud NameSpace')
parser.add_argument('--spot_price', default=0.60, help='Change spot price of instances')
parser.add_argument('-b', '--bucket', default='tcga-data-cgl-recompute', help='Bucket where data is.')
parser.add_argument('-d', '--shared_dir', required=True,
help='Full path to directory with: pipeline script, launch script, config, and master key.')
params = parser.parse_args()
# Run sequence
start = time.time()
# Get number of samples from config
with open(params.config, 'r') as f:
num_samples = len(f.readlines())
# Launch cluster and pipeline
uuid = fix_launch(params)
launch_cluster(params)
ids = get_instance_ids(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-worker')
launch_pipeline(params)
# Blocks until all workers are idle
stop = time.time()
# Collect metrics from cluster
collect_metrics(ids, list_of_metrics, start, stop, uuid=uuid)
# Apply "Insta-kill" alarm to every worker
map(apply_alarm_to_instance, ids)
# Kill leader
logging.info('Killing Leader')
leader_id = get_instance_ids(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-leader')[0]
apply_alarm_to_instance(leader_id, threshold=5)
# Generate Run Report
avail_zone = get_avail_zone(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-worker')[0]
total_cost, avg_hourly_cost = calculate_cost(params.instance_type, ids[0], avail_zone)
# Report values
output = ['UUID: {}'.format(uuid),
'Number of Samples: {}'.format(num_samples),
'Number of Nodes: {}'.format(params.cluster_size),
'Cluster Name: {}'.format(params.cluster_name),
'Source Bucket: {}'.format(params.bucket),
'Average Hourly Cost: ${}'.format(avg_hourly_cost),
'Cost per Instance: ${}'.format(total_cost),
'Availability Zone: {}'.format(avail_zone),
'Start Time: {}'.format(datetime.isoformat(datetime.utcfromtimestamp(start))),
'Stop Time: {}'.format(datetime.isoformat(datetime.utcfromtimestamp(stop))),
'Total Cost of Cluster: ${}'.format(float(total_cost) * int(params.cluster_size)),
'Cost Per Sample: ${}'.format((float(total_cost) * int(params.cluster_size) / int(num_samples)))]
with open(os.path.join(str(uuid) + '_{}'.format(str(datetime.utcnow()).split()[0]), 'run_report.txt'), 'w') as f:
f.write('\n'.join(output))
# You're done!
logging.info('\n\nScaling Test Complete.')
if __name__ == '__main__':
main()