Revision ba0dc5ddaad56dea2d6207a14b2087832e8d2211 authored by Collin Capano on 16 September 2017, 22:10:52 UTC, committed by Soumi De on 16 September 2017, 22:10:52 UTC
* first pass at adding CustomTransform class

* add functions

* add from config, remove inverse for now

* add waveform transforms to likelihood

* add waveform transforms to pycbc_inference

* fix bugs

* fix more bugs

* fix doc
1 parent 5c16927
Raw File
pycbc_create_sbank_workflow
#!/usr/bin/env python

# Copyright (C) 2016 Ian W. Harry, Y Ddraig Goch
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
Workflow generator for the lalapps_cbc_sbank template bank generation.
This is intended to be standalone, without putting things like the
SbankExecutable class in the pycbc.workflow module, to give an illustration of
how a simple workflow is constructed with pycbc.workflow.
"""

#imports
from __future__ import division
import os
import argparse
import pycbc
import pycbc.version
import pycbc.workflow as wf
import pycbc.workflow.pegasus_workflow as pwf

# Boiler-plate stuff
__author__  = "Ian Harry <ian.harry@ligo.org>"
__version__ = pycbc.version.git_verbose_msg
__date__    = pycbc.version.date
__program__ = "pycbc_create_sbank_workflow"

# We define classes for all executables used in the workflow

class SbankExecutable(wf.Executable):
    """ Class for running lalapps_cbc_sbank
    """
    # This can be altered if you don't always want to store output files
    current_retention_level = wf.Executable.FINAL_RESULT

    # This tells us that reference-psd is a file option
    file_input_options = ['--reference-psd']

    sbank_job_seed = 0

    def create_node(self, analysis_time, seed_bank=None, trial_bank=None,
                     mchirp_boundaries_file=None, mchirp_boundary_idx=None,
                     extra_tags=None):
        if extra_tags is None:
            extra_tags = []
        node = wf.Executable.create_node(self)
        # Most options are specified in the config file. In some cases though,
        # for example input/output files, options are specified directly in
        # the create_node function. *DO NOT* specify these in the config file.

        # The seed must be unique for each job and reproducible
        node.add_opt('--seed', str(self.sbank_job_seed))
        SbankExecutable.sbank_job_seed += 1

        # These input files are optional. If given, add them
        if seed_bank is not None:
            node.add_input_opt('--bank-seed', seed_bank)
        if trial_bank is not None:
            node.add_input_opt('--trial-waveforms', trial_bank)
        if mchirp_boundaries_file is not None:
            node.add_input_opt('--mchirp-boundaries-file',
                               mchirp_boundaries_file)
            # The boundaries file option also requires the boundary idx
            assert(mchirp_boundary_idx is not None)
            node.add_opt('--mchirp-boundaries-index', mchirp_boundary_idx)

        # Here we add the output file, but we are letting pycbc.workflow
        # handle how to name the file
        node.new_output_file_opt(analysis_time, '.h5',
                              '--output-filename', tags=self.tags + extra_tags)
        return node

class SbankChooseMchirpBinsExecutable(wf.Executable):
    """ Class for running lalapps_cbc_sbank_choose_mchirp_boundaries
    """
    current_retention_level = wf.Executable.ALL_TRIGGERS

    def create_node(self, analysis_time, input_file, nbanks):
        node = wf.Executable.create_node(self)

        # Here we add the output file
        node.new_output_file_opt(analysis_time, '.txt',
                                 '--output-file', tags=self.tags)

        # And the input file, which is an argument, not an option
        node.add_input_arg(input_file)

        # nbanks is just a normal option, but as it affects the workflow
        # structure, it is supplied here and not directly in the config file
        node.add_opt('--nbanks', nbanks)

        return node

# There is already a ligolw_add executable (wf.LigolwAddExecutable), this needs
# a minor change because we are potentially dealing with sub-daxes here.
class LigolwAddExecutable(wf.LigolwAddExecutable):

    def create_node(self, jobSegment, input_files, output=None,
                    use_tmp_subdirs=True, tags=None):
        if output is not None:
            # Convert path to file
            out_file = wf.File.from_path(output)
            if self.retain_files:
                if not os.path.isabs(output):
                    out_file.storage_path = os.path.join(self.out_dir,
                                                         output)
                else:
                    out_file.storage_path = output
        else:
            out_file = output

        return super(LigolwAddExecutable, self).create_node\
                        (jobSegment, input_files, output=out_file,
                         use_tmp_subdirs=use_tmp_subdirs, tags=tags)

class CombineHDFBanksExecutable(wf.Executable):
    """ Class for running a combination of hdf banks
    """
    current_retention_level = wf.Executable.ALL_TRIGGERS

    def create_node(self, analysis_time, input_file_list, output=None,
                    tags=None):
        node = wf.Executable.create_node(self)

        # Here we add the input files
        node.add_input_list_opt('--input-filenames', input_file_list)

        curr_tags = self.tags
        if tags is not None:
            curr_tags += tags

        # Output file
        if output is not None:
            # Convert path to file
            out_file = wf.File.from_path(output)
            if self.retain_files:
                if not os.path.isabs(output):
                    out_file.storage_path = os.path.join(self.out_dir,
                                                         output)
                else:
                    out_file.storage_path = output
            node.add_output_opt('--output-file', out_file)
        else:
            node.new_output_file_opt(analysis_time, '.h5',
                                     '--output-file', tags=curr_tags)

        return node


##############################################################################
# Argument parsing and setup of workflow                                     #
##############################################################################


# Use the standard workflow command-line parsing routines. Things like a 
# configuration file are specified within the "workflow command line group"
# so run this with --help to see what options are added.
_desc = __doc__[1:]
parser = argparse.ArgumentParser(description=_desc)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument("--workflow-name", type=str, default='sbank_workflow',
                    help="Descriptive name of the analysis.")
parser.add_argument("-d", "--output-dir", default=None,
                    help="Path to output directory.")
parser.add_argument("--output-file", type=str, default=None,
                    help="Specify the output file name. Either a name can be "
                         "provided or a full path to file. Is this is not "
                         "given a filename and location is chosen "
                         "automatically.")
parser.add_argument("--dax-filename", type=str, default=None,
                    help="This can be used if running this job in a "
                         "sub-workflow to specify the dax filename.")
parser.add_argument("--map-filename", type=str, default=None,
                    help="This can be used if running this job in a "
                         "sub-workflow to specify the output map filename. "
                         "WARNING: Giving this if not running as a "
                         "sub-workflow will cause pycbc_submit_dax to not "
                         "work.")
parser.add_argument("--is-sub-workflow", default=False, action="store_true",
                    help="Only give this option if this code is being run "
                    "as a sub-workflow within pegasus. If this means nothing "
                    "to you, do not give this option.")
parser.add_argument("--tags", default=[], nargs="*", action="store",
                    help="If this option is given all jobs, and all workflow "
                          "configuration options, will use the tags given "
                          "here. This can be used if running this as a "
                          "sub-workflow to give two sets of options to two "
                          "different invocations of the sbank workflow.")


wf.add_workflow_command_line_group(parser)
args = parser.parse_args()

# Create the workflow object
workflow = wf.Workflow(args, args.workflow_name)

if not args.is_sub_workflow:
    wf.makedir(args.output_dir)
    os.chdir(args.output_dir)
    args.output_dir = '.'

##############################################################################
# Do I have a seed bank provided and will this be used for chirp mass        #
# boundaries? If not then add the COARSE job.                                #
##############################################################################

seed_file = None
bins_inp_file = None
if workflow.cp.has_option_tags('workflow', 'seed-bank', args.tags):
    # If a seed bank is provided register it as a File object 
    seed_banks = workflow.cp.get_opt_tags('workflow', 'seed-bank', args.tags)
    seed_banks = seed_banks.split(' ')
    if len(seed_banks) == 0:
        raise ValueError("No seed bank actually provided!")
    seed_files = []
    for seed_bank in seed_banks:
        if not args.is_sub_workflow:
            seed_file = wf.File.from_path(seed_bank)
        else:
            seed_file = pwf.File(os.path.basename(seed_bank))
        seed_files.append(seed_file)
    if len(seed_files) == 1:
        seed_file = seed_files[0]
    else:
        # Combine with h5add
        out_dir = os.path.join(args.output_dir, 'input_combine')
        h5add_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
                                              ifos=['H1L1V1'],
                                              out_dir=out_dir,
                                              tags=['INPUT'] + args.tags)
        h5add_exe.update_current_retention_level(wf.Executable.ALL_TRIGGERS)
        h5add_node = h5add_exe.create_node(workflow.analysis_time,
                                           seed_files)
        workflow += h5add_node
        assert(len(h5add_node.output_files) == 1)
        seed_file = h5add_node.output_files[0]

    # bins_inp_file will go to the mchirp_bins generator. seed file will go to
    # the first set of sbank jobs if not None.
    bins_inp_file = seed_file

if not workflow.cp.has_option_tags('workflow', 'use-seed-bank-for-chirp-bins',
                                   args.tags):
    out_dir = os.path.join(args.output_dir, 'coarse')
    # Generate Executable class (similar to Job in the old terminology)
    # The tags=coarse option is used to ensure that options in the
    # ['sbank-coarse']section of the ini file are sent to this job, and *only*
    # this job
    coarse_sbank_exe = SbankExecutable(workflow.cp, 'sbank',
                                       ifos=workflow.ifos,
                                       out_dir=out_dir,
                                       tags=['coarse']+args.tags)
    coarse_sbank_exe.update_current_retention_level\
           (wf.Executable.MERGED_TRIGGERS)
    # Then make a specific node
    coarse_node = coarse_sbank_exe.create_node(workflow.analysis_time)
    # Add to workflow
    workflow += coarse_node
    # And record output file, as it will be needed later
    assert(len(coarse_node.output_files) == 1)
    bins_inp_file = coarse_node.output_files[0]
    if seed_file is None:
        if not workflow.cp.has_option_tags('workflow',
                                           'do-not-use-coarse-job-as-seed',
                                           args.tags):
            seed_file = bins_inp_file

if bins_inp_file is None:
    # Only get here if weird options are given
    err_msg = 'You have not given a seed bank but have asked to use the seed '
    err_msg += 'bank for generating the chirp mass bins. This is not possible.'
    raise ValueError(err_msg)

##############################################################################
# Begin the parallelization loops                                            #
##############################################################################

# How many repetitions to try? Get this from config-parser. Special
# config-parser options like this go in the [workflow] section

num_cycles = int(workflow.cp.get_opt_tags('workflow', 'num-cycles', args.tags))

# Create executables up front to make plots nicer in the dashboard
bins_exe = SbankChooseMchirpBinsExecutable(workflow.cp, 'sbank_mchirp_bins',
                                    ifos=workflow.ifos, tags=args.tags)
main_sbank_exe = SbankExecutable(workflow.cp, 'sbank', ifos=workflow.ifos,
                                     tags=['parallel'] + args.tags)
h5add_first_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
                                            ifos=['H1L1V1'],
                                            tags=['FIRST'] + args.tags)
readder_sbank_exe = SbankExecutable(workflow.cp, 'sbank',
                                    ifos=workflow.ifos,
                                    tags=['readder'] + args.tags)
h5add_final_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
                                            ifos=['H1L1V1'],
                                            tags=['FINAL'] + args.tags)

for cycle_idx in range(num_cycles):
    #########
    # SETUP #
    #########
    cycle_tag = 'cycle%d' %(cycle_idx)
    out_dir = os.path.join(args.output_dir, cycle_tag)

    # How many banks to use? This can vary cycle to cycle, or be the same for
    # all. Either supply it once in [workflow], or in [workflow-cycleN] for
    # N in range(num_cycles)
    nbanks = workflow.cp.get_opt_tags('workflow', 'nbanks',
                                      tags=[cycle_tag]+args.tags)
    nbanks = int(nbanks)
    
    #############
    # MASS BINS #
    #############
    bins_exe.update_current_tags([cycle_tag] + args.tags)
    bins_exe.update_output_directory(out_dir=out_dir)
    bins_node = bins_exe.create_node(workflow.analysis_time,
                                     bins_inp_file, nbanks)
    workflow += bins_node
    assert(len(bins_node.output_files) == 1)
    bins_out_file = bins_node.output_files[0]

    #######################
    # PARALELLIZED SBANKS #
    #######################

    main_sbank_exe.update_current_tags(['parallel', cycle_tag] + args.tags)
    main_sbank_exe.update_output_directory(out_dir=out_dir)
    # These jobs we don't always want to store
    main_sbank_exe.update_current_retention_level\
            (wf.Executable.INTERMEDIATE_PRODUCT)

    main_sbank_files = wf.FileList([])
    for nbank_idx in range(nbanks):
        nbank_tag = 'nbank%d' %(nbank_idx)
        main_sbank_node = (main_sbank_exe.create_node\
                           (workflow.analysis_time, seed_bank=seed_file,
                            mchirp_boundaries_file=bins_out_file,
                            mchirp_boundary_idx=nbank_idx,
                            extra_tags=[nbank_tag]))
        workflow += main_sbank_node
        assert(len(main_sbank_node.output_files) == 1)
        main_sbank_files += main_sbank_node.output_files

    ############
    # COMBINER #
    ############

    h5add_first_exe.update_current_tags([cycle_tag, 'FIRST'] + args.tags)
    h5add_first_exe.update_output_directory(out_dir=out_dir)
    h5add_first_exe.update_current_retention_level(wf.Executable.ALL_TRIGGERS)
    h5add_node = h5add_first_exe.create_node(workflow.analysis_time,
                                              main_sbank_files)
    workflow += h5add_node
    assert(len(h5add_node.output_files) == 1)
    h5add_out = h5add_node.output_files[0]

    ###########
    # READDER #
    ###########

    readder_sbank_exe.update_current_tags([cycle_tag, 'readder'] + args.tags)
    readder_sbank_exe.update_output_directory(out_dir=out_dir)
    readder_sbank_exe.update_current_retention_level\
            (wf.Executable.ALL_TRIGGERS)
    readder_sbank_node = readder_sbank_exe.create_node(workflow.analysis_time,
                                                       trial_bank=h5add_out)
    workflow += readder_sbank_node
    assert(len(readder_sbank_node.output_files) == 1)
    readder_out = readder_sbank_node.output_files[0]

    #################
    # FINAL COMBINE #
    #################

    # Is this the final output file?
    if cycle_idx == (num_cycles - 1):
        out_dir = args.output_dir
        crl = wf.Executable.FINAL_RESULT
        output_path = args.output_file
    else:
        crl = wf.Executable.MERGED_TRIGGERS
        output_path = None

    h5add_final_exe.update_current_tags([cycle_tag, 'FINAL'] + args.tags)
    h5add_final_exe.update_output_directory(out_dir)
    h5add_final_exe.update_current_retention_level(crl)

    if seed_file is None:
        inputs = [readder_out]
    else:
        inputs = [seed_file, readder_out]

    h5add_node = h5add_final_exe.create_node(workflow.analysis_time,
                                             inputs, output=output_path)
    workflow += h5add_node
    assert(len(h5add_node.output_files) == 1)
    # This becomes the input file for the next loop if going again
    seed_file = h5add_node.output_files[0]
    bins_inp_file = seed_file

workflow.save(filename=args.dax_filename, output_map_path=args.map_filename)
back to top