https://github.com/jvivian/one_off_scripts
Raw File
Tip revision: 3ad04be99cd01e6a047c1b530cc8a1de82bd862e authored by John Vivian on 02 February 2017, 01:31:18 UTC
Refactor SRA pipeline to use faster method than fastq-dump
Tip revision: 3ad04be
download_encrypted_upload_to_s3.py
#!/usr/bin/env python2.7
"""
Download and re-upload with encryption

Author: John Vivian
"""
import argparse
import base64
import hashlib
import os
import subprocess
import boto
from urlparse import urlparse
from math import ceil

from toil.job import Job


def parse_bucket(bucket):
    """
    Parses config file. Returns list of samples: [ [uuid1, url1], [uuid2, url2], ... ]
    """
    url = urlparse(bucket)
    if url.scheme == 's3':
        bucket = url.netloc
    else:
        bucket = url.path.lstrip('/')

    conn = boto.connect_s3()
    bucket = conn.get_bucket(bucket)

    url_prefix = 'https://s3-us-west-2.amazonaws.com/'
    samples = []
    for key in bucket.list():
        samples.append((os.path.join(url_prefix, key.name), key.size))

    return samples


def generate_unique_key(master_key_path, url):
    """
    master_key_path: str    Path to the BD2K Master Key (for S3 Encryption)
    url: str                S3 URL (e.g. https://s3-us-west-2.amazonaws.com/bucket/file.txt)

    Returns: str            32-byte unique key generated for that URL
    """
    with open(master_key_path, 'r') as f:
        master_key = f.read()
    assert len(master_key) == 32, 'Invalid Key! Must be 32 characters. ' \
                                  'Key: {}, Length: {}'.format(master_key, len(master_key))
    new_key = hashlib.sha256(master_key + url).digest()
    assert len(new_key) == 32, 'New key is invalid and is not 32 characters: {}'.format(new_key)
    return new_key


def generate_unique_key2(master_key_path, url):
    """
    Generates unique 32-byte encryption key given a master key
    to use as a template and an S3 URL (s3://bucket/dir format)

    :param str master_key_path: Path to master key which per-file encryption key is based off of
    :param str url: S3 URL to be used as the key
    :return:
    """
    assert url.startswith('s3'), 'URL must be an s3 url: s3://bucket/example/file'
    with open(master_key_path, 'r') as f:
        master_key = f.read()
    assert len(master_key) == 32, 'Invalid Key! Must be 32 characters: {}'.format(len(master_key))
    new_key = hashlib.sha256(master_key + url).digest()
    assert len(new_key) == 32
    return new_key


def batcher(job, samples, args):
    """
    Spawns a tree of jobs to avoid overloading the number of jobs spawned by a single parent.
    """
    if len(samples) > 1:
        a = samples[len(samples)/2:]
        b = samples[:len(samples)/2]
        job.addChildJobFn(batcher, a, args)
        job.addChildJobFn(batcher, b, args)
    else:
        url, size = samples[0]
        size = '{}G'.format(ceil(size * 1.0 / 1024**3))
        job.addChildJobFn(download_encrypted_file_upload_to_s3, url, args, disk=size)


def download_encrypted_file_upload_to_s3(job, url, args):
    """
    Downloads encrypted files from S3 via header injection

    input_args: dict    Input dictionary defined in main()
    name: str           Symbolic name associated with file
    """
    work_dir = job.fileStore.getLocalTempDir()
    file_path = os.path.join(work_dir, os.path.basename(url))

    # Download File
    key = generate_unique_key(args.ssec, url)

    encoded_key = base64.b64encode(key)
    encoded_key_md5 = base64.b64encode(hashlib.md5(key).digest())
    h1 = 'x-amz-server-side-encryption-customer-algorithm:AES256'
    h2 = 'x-amz-server-side-encryption-customer-key:{}'.format(encoded_key)
    h3 = 'x-amz-server-side-encryption-customer-key-md5:{}'.format(encoded_key_md5)
    subprocess.check_call(['curl', '-fs', '--retry', '5', '-H', h1, '-H', h2, '-H', h3, url, '-o', file_path])
    assert os.path.exists(file_path)

    # Upload
    s3_url = os.path.join('s3://', urlparse(url).path.lstrip('/'))
    with open(os.path.join(work_dir, 'temp.key'), 'wb') as f_out:
        f_out.write(generate_unique_key2(args.ssec, s3_url))
    # Upload to S3 via S3AM
    s3am_command = ['s3am',
                    'upload',
                    '--resume'
                    '--sse-key-file', os.path.join(work_dir, 'temp.key'),
                    'file://{}'.format(file_path),
                    s3_url]
    subprocess.check_call(s3am_command)


def main():
    parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('--bucket', required=True, help='bucket to reencrypt')
    parser.add_argument('--ssec', required=True, help='master key')

    Job.Runner.addToilOptions(parser)
    args = parser.parse_args()

    samples = parse_bucket(args.bucket)
    Job.Runner.startToil(Job.wrapJobFn(batcher, samples, args), args)


if __name__ == '__main__':
    main()
back to top