https://github.com/jvivian/one_off_scripts
Raw File
Tip revision: 3ad04be99cd01e6a047c1b530cc8a1de82bd862e authored by John Vivian on 02 February 2017, 01:31:18 UTC
Refactor SRA pipeline to use faster method than fastq-dump
Tip revision: 3ad04be
toil_uber_script.py
#!/usr/bin/env python2.7
"""
Author: John Vivian
Date: 2-9-16
"""
import logging
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)-15s:%(levelname)s:%(name)s:%(message)s',
                    datefmt='%m-%d %H:%M:%S')

import argparse
from collections import namedtuple
import csv
import os
import random
import subprocess
import boto
from boto.exception import BotoServerError, EC2ResponseError
import boto.ec2.cloudwatch
import time
from uuid import uuid4
from tqdm import tqdm
import errno
from boto_lib import get_instance_ids
from datetime import datetime, timedelta

metric_endtime_margin = timedelta(hours=1)
metric_initial_wait_period_in_seconds = 0
metric_collection_interval_in_seconds = 1800
metric_start_time_margin = 1800


def create_config(params):
    """
    Creates a configuration file with a random selection of samples that equal the sample size desired.

    params: argparse.Namespace      Input arguments
    """
    log.info('Creating Configuration File')
    # Acquire keys from bucket
    conn = boto.connect_s3()
    bucket = conn.get_bucket(params.bucket)
    keys = [x for x in bucket.list()]
    random.shuffle(keys)
    log.info('Choosing subset from {} number of samples'.format(len(keys)))
    # Collect random samples until specified limit is reached
    total = 0
    samples = []
    while total <= params.sample_size:
        key = keys.pop()
        samples.append(key)
        total += key.size * 1.0 / (1024 ** 4)
    log.info('{} samples selected, totaling {} TB (requested {} TB).'.format(len(samples), total, params.sample_size))
    # Write out config
    with open(os.path.join(params.share, 'config.txt'), 'w') as f:
        prefix = 'https://s3-us-west-2.amazonaws.com'
        for key in samples:
            name = key.name.split('/')[-1]
            f.write(name.split('.')[0] + ',' + os.path.join(prefix, params.bucket, name) + '\n')
    log.info('Number of samples selected is: {}'.format(len(samples)))


def launch_cluster(params):
    """
    Launches a toil cluster of size N, with shared dir S, of instance type I, at a spot bid of B

    params: argparse.Namespace      Input arguments
    """
    log.info('Launching cluster of size: {} and type: {}'.format(params.num_workers, params.instance_type))
    subprocess.check_call(['cgcloud',
                           'create-cluster',
                           '--leader-instance-type', params.leader_type,
                           '--instance-type', params.instance_type,
                           '--share', params.share,
                           '--num-workers', str(params.num_workers),
                           '--cluster-name', params.cluster_name,
                           '--spot-bid', str(params.spot_bid),
                           '--leader-on-demand',
                           '--num-threads', str(params.num_workers),
                           '--zone', str(params.zone),
                           '--ssh-opts',
                           '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no',
                           'toil'])


def place_boto_on_leader(params):
    log.info('Adding a .boto to leader to avoid credential timeouts.')
    subprocess.check_call(['cgcloud', 'rsync', '--cluster-name', params.cluster_name,
                           '--ssh-opts=-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no',
                           'toil-leader', params.boto_path, ':'])


def launch_rnaseq_pipeline(params):
    """
    Launches pipeline on toil-leader in a screen named the cluster run name

    params: argparse.Namespace      Input arguments
    """

    jobstore = params.jobstore if params.jobstore else '{}-{}'.format(uuid4(), str(datetime.utcnow().date()))
    wiggle = '--wiggle' if params.wiggle else ''
    save_bam = '--save_bam' if params.save_bam else ''
    restart = '--restart' if params.restart else ''
    log.info('Launching Pipeline and blocking. Check log.txt on leader for stderr and stdout')
    try:
        # Create screen session
        subprocess.check_call(['cgcloud', 'ssh', '--cluster-name', params.cluster_name, 'toil-leader',
                               '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no',
                               'screen', '-dmS', params.cluster_name])
        # Run command on screen session
        subprocess.check_call(['cgcloud', 'ssh', '--cluster-name', params.cluster_name, 'toil-leader',
                               '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no',
                               'screen', '-S', params.cluster_name, '-X', 'stuff',
                               '"python /home/mesosbox/shared/rnaseq_cgl_pipeline.py \
                                aws:us-west-2:{0} \
                                --config /home/mesosbox/shared/config.txt \
                                --retryCount 2 \
                                --ssec /home/mesosbox/shared/master.key \
                                --s3_dir {1} \
                                --sseKey=/home/mesosbox/shared/master.key \
                                --batchSystem="mesos" \
                                --mesosMaster mesos-master:5050 \
                                --workDir=/var/lib/toil \
                                {2} \
                                {3} \
                                {4} >& log.txt\n"'.format(jobstore, params.bucket, wiggle,
                                                          save_bam, restart)])
    except subprocess.CalledProcessError as e:
        log.info('Pipeline exited with non-zero status code: {}'.format(e))


def get_metric(cw, metric, instance_id, start, stop):
    """
    returns metric object associated with a paricular instance ID

    metric_name: str            Name of Metric to be Collected
    instance_id: str            Instance ID
    start: float                ISO format of UTC time start point
    stop: float                 ISO format of UTC time stop point
    :return: metric object
    """
    namespace, metric_name = metric.rsplit('/', 1)
    metric_object = cw.get_metric_statistics(namespace=namespace,
                                             metric_name=metric_name,
                                             dimensions={'InstanceId': instance_id},
                                             start_time=start,
                                             end_time=stop,
                                             period=300,
                                             statistics=['Average'])
    return metric_object


def collect_realtime_metrics(params, threshold=0.5, region='us-west-2'):
    """
    Collect metrics from AWS instances in 1 hour intervals.
    Instances that have gone idle (below threshold CPU value) are terminated.

    params: argparse.Namespace      Input arguments
    region: str                     AWS region metrics are being collected from
    uuid: str                       UUID of metric collection
    """
    list_of_metrics = ['AWS/EC2/CPUUtilization',
                       'CGCloud/MemUsage',
                       'CGCloud/DiskUsage_mnt_ephemeral',
                       'CGCloud/DiskUsage_root',
                       'AWS/EC2/NetworkIn',
                       'AWS/EC2/NetworkOut',
                       'AWS/EC2/DiskWriteOps',
                       'AWS/EC2/DiskReadOps']

    # Create output directory
    uuid = str(uuid4())
    date = str(datetime.utcnow().date())
    dir_path = '{}_{}_{}'.format(params.cluster_name, uuid, date)
    mkdir_p(dir_path)

    start = time.time() - metric_start_time_margin

    # Create connections to ec2 and cloudwatch
    conn = boto.ec2.connect_to_region(region)
    cw = boto.ec2.cloudwatch.connect_to_region(region)
    # Create initial variables
    start = datetime.utcfromtimestamp(start)
    DataPoint = namedtuple('datapoint', ['instance_id', 'value', 'timestamp'])
    timestamps = {}
    # Begin loop
    log.info('Metric collection has started. '
             'Waiting {} seconds before initial collection.'.format(metric_initial_wait_period_in_seconds))
    time.sleep(metric_initial_wait_period_in_seconds)
    while True:
        ids = get_instance_ids(filter_cluster=params.cluster_name, filter_name=params.namespace + '_toil-worker')
        if not ids:
            break
        metric_collection_time = time.time()
        try:
            for instance_id in tqdm(ids):
                kill_instance = False
                for metric in list_of_metrics:
                    datapoints = []
                    aws_start = timestamps.get(instance_id, start)
                    aws_stop = datetime.utcnow() + metric_endtime_margin
                    metric_object = get_metric(cw, metric, instance_id, aws_start, aws_stop)
                    for datum in metric_object:
                        d = DataPoint(instance_id=instance_id, value=datum['Average'], timestamp=datum['Timestamp'])
                        datapoints.append(d)
                    # Save data in local directory
                    if datapoints:
                        datapoints = sorted(datapoints, key=lambda x: x.timestamp)
                        with open(os.path.join(dir_path, '{}.tsv'.format(os.path.basename(metric))), 'a') as f:
                            writer = csv.writer(f, delimiter='\t')
                            writer.writerows(datapoints)
                    # Check if instance's CPU has been idle the last 20 minutes.
                    if metric == 'AWS/EC2/CPUUtilization':
                        averages = [x.value for x in sorted(datapoints, key=lambda x: x.timestamp)][-4:]
                        # If there is at least 20 minutes of data points and max is below threshold, flag to be killed.
                        if len(averages) == 4:
                            if max(averages) < threshold:
                                kill_instance = True
                                log.info('Flagging {} to be killed. '
                                         'Max CPU {} for last 30 minutes.'.format(instance_id, max(averages)))
                # Kill instance if idle
                if kill_instance:
                    try:
                        log.info('Terminating Instance: {}'.format(instance_id))
                        conn.terminate_instances(instance_ids=[instance_id])
                    except (EC2ResponseError, BotoServerError) as e:
                        log.info('Error terminating instance: {}\n{}'.format(instance_id, e))
                # Set start point to be last collected timestamp
                timestamps[instance_id] = max(x.timestamp for x in datapoints) if datapoints else start
        except BotoServerError:
            log.error('Giving up trying to fetch metric for this interval')
        # Sleep
        collection_time = time.time() - metric_collection_time
        log.info('Metric collection took: {} seconds. '
                 'Waiting {} seconds.'.format(collection_time, metric_collection_interval_in_seconds))
        wait_time = metric_collection_interval_in_seconds - collection_time
        if wait_time < 0:
            log.warning('Collection time exceeded metric collection interval by: %i', -wait_time)
        else:
            time.sleep(wait_time)
    log.info('Metric collection has finished.')


def mkdir_p(path):
    """
    It is Easier to Ask for Forgiveness than Permission
    """
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise


def main():
    """
    Author: John Vivian (jtvivian@gmail.com)

    This script was used for the Toil recompute to:
        - Launch the initial cluster
        - Launch the CGL RNA-seq pipeline
        - Collect metriccs
        - Terminate idle instances
    """
    parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    subparsers = parser.add_subparsers(dest='command')

    # Create Config
    parser_config = subparsers.add_parser('create-config', help='Creates config.txt based on sample size')
    parser_config.add_argument('-s', '--sample-size', required=True, type=float,
                               help='Size of the sample deisred in TB.')
    parser_config.add_argument('-b', '--bucket', default='tcga-data-cgl-recompute',
                               help='Source bucket to pull data from.')
    parser_config.add_argument('-S', '--share', required=True, help='Directory to save config.txt')

    # Launch Cluster
    parser_cluster = subparsers.add_parser('launch-cluster', help='Launches AWS cluster via CGCloud')
    parser_cluster.add_argument('-s', '--num-workers', required=True, help='Number of workers desired in the cluster.')
    parser_cluster.add_argument('-c', '--cluster-name', required=True, help='Name of cluster.')
    parser_cluster.add_argument('-S', '--share', required=True,
                                help='Full path to directory: pipeline script, launch script, config, and master key.')
    parser_cluster.add_argument('--spot-bid', default=1.00, help='Change spot price of instances')
    parser_cluster.add_argument('-t', '--instance-type', default='c3.8xlarge',
                                help='slave instance type. e.g.  m4.large or c3.8xlarge.')
    parser_cluster.add_argument('-T', '--leader-type', default='m3.medium', help='Sets leader instance type.')
    parser_cluster.add_argument('-b', '--boto-path', default='/home/mesosbox/.boto', type=str,
                                help='Path to local .boto file to be placed on leader.')
    parser_cluster.add_argument('-z', '--zone', default='us-west-2a', help='Zone to launch cluster in')

    # Launch Pipeline
    parser_pipeline = subparsers.add_parser('launch-pipeline', help='Launches pipeline')
    parser_pipeline.add_argument('-c', '--cluster-name', required=True, help='Name of cluster.')
    parser_pipeline.add_argument('-b', '--bucket', required=True, help='Set destination bucket.')
    parser_pipeline.add_argument('-j', '--jobstore', default=None,
                                 help='Name of jobstore. Defaults to UUID-Date if not set')
    parser_pipeline.add_argument('--restart', action='store_true',
                                 help='Attempts to restart pipeline, requires existing jobstore.')
    parser_pipeline.add_argument('-w', '--wiggle', action='store_true', help='Saves BedGraph files from STAR')
    parser_pipeline.add_argument('-s', '--save-bam', action='store_true', help='Saves BAM from run')

    # Launch Metric Collection
    parser_metric = subparsers.add_parser('launch-metrics', help='Launches metric collection thread')
    parser_metric.add_argument('-c', '--cluster-name', required=True, help='Name of cluster.')
    parser_metric.add_argument('--namespace', default='jtvivian', help='CGCloud NameSpace')

    # Parse args
    params = parser.parse_args()

    # Modular Run Sequence
    if params.command == 'create-config':
        create_config(params)
    elif params.command == 'launch-cluster':
        launch_cluster(params)
        place_boto_on_leader(params)
    elif params.command == 'launch-pipeline':
        launch_rnaseq_pipeline(params)
    elif params.command == 'launch-metrics':
        collect_realtime_metrics(params)


if __name__ == '__main__':
    main()
back to top