Revision a24be950c8a6ded839eff2786fc77d75e15e84ef authored by Ben Lackey on 07 March 2017, 16:59:35 UTC, committed by Ian Harry on 07 March 2017, 16:59:35 UTC
1 parent 903e9ea
Raw File
pycbc_geom_aligned_bank
#!/usr/bin/env python

# Copyright (C) 2016 Ian W. Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
Aligned spin bank generator. Initial placement of template points, and
rejection of "too far away" templates is done in the code, but the final
translation between points in the chirp parameters parameter space is done
in a workflow.
"""

from __future__ import division
import os
import copy
import argparse
import tempfile
import ConfigParser
import numpy
import logging
import distutils.spawn
import h5py
from scipy import spatial
from glue.ligolw import ligolw
from glue.ligolw import table
from glue.ligolw import lsctables
from glue.ligolw import ilwd
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import process as ligolw_process
import pycbc
import pycbc.psd
import pycbc.strain
import pycbc.version
import pycbc.tmpltbank
import pycbc.workflow as wf
import pycbc.workflow.pegasus_workflow as pwf

__author__  = "Ian Harry <ian.harry@ligo.org>"
__version__ = pycbc.version.git_verbose_msg
__date__    = pycbc.version.date
__program__ = "pycbc_geom_aligned_bank"


# Define Executable functions for workflow generation

class GeomAligned2DStackExecutable(wf.Executable):
    """ Class for running pycbc_geom_aligned_2dstack.
    """
    current_retention_level = wf.Executable.ALL_TRIGGERS

    file_input_options = ['--asd-file', '--psd-file']

    def create_node(self, input_file, split_bank_num,
                    analysis_time, extra_tags=None):
        """ Create a node.
        """
        if extra_tags is None:
            extra_tags = []
        node = wf.Executable.create_node(self)

        node.add_input_opt('--input-file', input_file)
        node.add_opt('--split-bank-num', split_bank_num)

        node.new_output_file_opt(analysis_time, '.hdf', '--output-file',
                                 tags=self.tags + extra_tags + ['bank'])
        return node

class AlignedBankCatExecutable(wf.Executable):
    """ Class for running pycbc_aligned_bank_cat.
    """
    current_retention_level = wf.Executable.FINAL_RESULT

    file_input_options = ['--asd-file', '--psd-file']

    def create_node(self, input_file_list, metadata_file, analysis_time,
                    output_file_path=None):
        """ Create a node.
        """
        node = wf.Executable.create_node(self)

        node.add_input_list_opt('--input-files', input_file_list)
        node.add_input_opt('--metadata-file', metadata_file)

        if output_file_path is None:
            node.new_output_file_opt(analysis_time, '.h5', '--output-file',
                                     tags=self.tags)
        else:
            # Convert path to file
            out_file = wf.File.from_path(output_file_path)
            if self.retain_files:
                if not os.path.isabs(output_file_path):
                    out_file.storage_path = os.path.join(self.out_dir,
                                                         output_file_path)
                else:
                    out_file.storage_path = output_file_path
            node.add_output_opt('--output-file', out_file)
        return node

class TmpltbankToChiParams(wf.Executable):
    """ Class for running pycbc_tmpltbank_to_chi_params.
    """
    current_retention_level = wf.Executable.ALL_TRIGGERS

    file_input_options = ['--asd-file', '--psd-file']

    def create_node(self, input_bank, analysis_time):
        """ Create a node.
        """
        node = wf.Executable.create_node(self)

        node.add_input_opt('--input-bank', input_bank)

        node.new_output_file_opt(analysis_time, '.dat', '--output-file',
                                 tags=self.tags)
        return node

class BankVerification(wf.Executable):
    """ Class for running pycbc_bank_verification.
    """
    current_retention_level = wf.Executable.FINAL_RESULT

    file_input_options = ['--asd-file', '--psd-file']

    def create_node(self, input_bank, analysis_time):
        """ Create a node.
        """
        node = wf.Executable.create_node(self)

        node.add_input_opt('--input-bank', input_bank)

        node.new_output_file_opt(analysis_time, '.png',
                                 '--histogram-output-file', tags=self.tags)

        return node

def add_commands_to_cp(commands, options, cp, section_name):
    for command in commands:
        command_uscore = (command.replace('-','_'))[2:]
        if hasattr(options, command_uscore):
            attr = getattr(options, command_uscore)
            if attr is True:
                cp.set(section_name, command[2:], '')
            elif attr:
                cp.set(section_name, command[2:], str(attr))

# Read command line options
_desc = __doc__[1:]
parser = argparse.ArgumentParser(description=_desc,
           formatter_class=pycbc.tmpltbank.IndentedHelpFormatterWithNL)

# Begin with code specific options
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument("-V", "--verbose", action="store_true", default=False,
                    help="verbose output")
parser.add_argument("-s", "--stack-distance", action="store", type=float,\
                  default=0.2, help="Minimum metric spacing before we "+\
                               "stack.")
parser.add_argument("-3", "--threed-lattice", action="store_true", default=False,\
                    help="Set this to use a 3D lattice. "+\
                         "OPTIONAL")
parser.add_argument("-S", "--split-bank-num", action="store", type=int,\
                    default=100,\
                    help="Number of points per job in dag. "+\
                         "OPTIONAL")
parser.add_argument("-F", "--filter-points", action="store_true", default=False,
                    help="Remove nearby points before generating the bank.")
parser.add_argument("--random-seed", action="store", type=int,\
                    default=None,
                    help="""Random seed to use whenever the numpy random
                            functions are called when doing the monte-carlo
                            for obtaining the principal components and when
                            translating all points back to physical space.
                            If this is used the code should give the same
                            output if run with the same random seed.""")
parser.add_argument("--print-chi-points", action="store", default=None,
                    metavar="FILENAME",
                    help="Add a node to print off an ASCII list of mass "
                    "parameters and corresponding location in the xi space "
                    "using pycbc_tmpltbank_to_chi_params. This will be "
                    "written to FILENAME. If this argument is not given, no "
                    "chi points file will be written.")
parser.add_argument("--workflow-name", type=str, default='bank_gen',
                    help="Name to use for the produced .dax file.")
parser.add_argument("--dax-filename", type=str, default=None,
                    help="This can be used if running this job in a "
                         "sub-workflow to specify the dax filename.")
parser.add_argument("--map-filename", type=str, default=None,
                    help="This can be used if running this job in a "
                         "sub-workflow to specify the output map filename. "
                         "WARNING: Giving this if not running as a "
                         "sub-workflow will cause pycbc_submit_dax to not "
                         "work.")
parser.add_argument("--intermediate-data-file", type=str, required=True,
                    help="The HDF file to be used to store data to pass down "
                    "to the various worker jobs.")
parser.add_argument("--metadata-file", type=str, required=True,
                    help="Location of the output file containing the metadata "
                    "that will be added to the final XML file.")
parser.add_argument("--is-sub-workflow", default=False, action="store_true",
                    help="Only give this option if this code is being run "
                    "as a sub-workflow within pegasus. If this means nothing "
                    "to you, do not give this option.")
parser.add_argument("--storage-path-base", default=None,
                    help="If running this code as a sub-workflow then this "
                    "path is pretended to all storage directories.")

pycbc.tmpltbank.insert_base_bank_options(parser)

# Insert the metric calculation options
metric_opts = pycbc.tmpltbank.insert_metric_calculation_options(parser)

# Insert the mass range options
mass_range_opts = pycbc.tmpltbank.insert_mass_range_option_group(parser)

# Insert the PSD options
psd_opts = pycbc.psd.insert_psd_option_group(parser)

# Insert the data reading options
data_opts = pycbc.strain.insert_strain_option_group(parser)

# Add the ethinca calculation options - applied in the combiner
ethinca_opts = pycbc.tmpltbank.insert_ethinca_metric_options(parser)

opts = parser.parse_args()
# Going to need this for setting up the dag later
orig_opts = copy.deepcopy(opts)

# Set up the process/process_params table and output xml document
cmds_file_name = opts.metadata_file
outdoc = ligolw.Document()
outdoc.appendChild(ligolw.LIGO_LW())
ligolw_process.register_to_xmldoc(outdoc, __program__, vars(opts), comment="",
                            ifos="", version=pycbc.version.git_hash,
                            cvs_repository='pycbc/'+pycbc.version.git_branch,
                            cvs_entry_time=pycbc.version.date)
ligolw_utils.write_filename(outdoc, cmds_file_name)

if opts.verbose:
    log_level = logging.DEBUG
else:
    log_level = logging.WARN
log_format='%(asctime)s %(message)s'
logging.basicConfig(format=log_format, level=log_level)

opts.max_mismatch = 1 - opts.min_match

pycbc.tmpltbank.verify_metric_calculation_options(opts, parser)
metricParams=pycbc.tmpltbank.metricParameters.from_argparse(opts)
pycbc.tmpltbank.verify_mass_range_options(opts, parser)
massRangeParams=pycbc.tmpltbank.massRangeParameters.from_argparse(opts)
pycbc.psd.verify_psd_options(opts, parser)
if opts.psd_estimation:
    pycbc.strain.verify_strain_options(opts, parser)
pycbc.tmpltbank.verify_ethinca_metric_options(opts, parser)
ethincaParams = pycbc.tmpltbank.ethincaParameters.from_argparse(opts)
# Ensure consistency of ethinca and bank metric parameters
pycbc.tmpltbank.check_ethinca_against_bank_params(ethincaParams, metricParams)
# Ethinca calculation should currently only be done for non-spin templates
if ethincaParams.full_ethinca and (massRangeParams.maxNSSpinMag>0.0 or
                                massRangeParams.maxBHSpinMag>0.0):
    parser.error("Ethinca metric calculation is currently not valid for "
                 "nonzero spins!")

# Set random seed if needed
if opts.random_seed is not None:
    numpy.random.seed(opts.random_seed)

# If we are going to use h(t) to estimate a PSD we need h(t)
if opts.psd_estimation:
    # FIXME: It would be nice if this was similar to psd.from_cli()
    logging.info("Obtaining h(t) for PSD generation")
    strain = pycbc.strain.from_cli(opts, pycbc.DYN_RANGE_FAC)
else:
    strain = None

# Get the PSD using the pycbc interface
logging.info("Obtaining PSD")
# Want the number of samples to be a binary number and Nyquist must be above
# opts.f_upper. All this assumes that 1 / deltaF is a binary number
nyquistFreq = 2**numpy.ceil(numpy.log2(opts.f_upper))
numSamples = int(round(nyquistFreq / opts.delta_f)) + 1
psd = pycbc.psd.from_cli(opts, length=numSamples, delta_f=opts.delta_f, \
                         low_frequency_cutoff=opts.f_low, strain=strain,
                         dyn_range_factor=pycbc.DYN_RANGE_FAC)
metricParams.psd = psd

# Begin by calculating a metric
logging.info("Calculating metric")
metricParams = pycbc.tmpltbank.determine_eigen_directions(metricParams)

logging.info("Calculating covariance matrix")

vals = pycbc.tmpltbank.estimate_mass_range(1000000, massRangeParams, \
       metricParams, metricParams.fUpper, covary=False) 
cov = numpy.cov(vals)
evalsCV, evecsCV = numpy.linalg.eig(cov)
evecsCVdict = {}
evecsCVdict[metricParams.fUpper] = evecsCV
metricParams.evecsCV = evecsCVdict


logging.info("Determining parameter space extent")

vals = pycbc.tmpltbank.estimate_mass_range(1000000, massRangeParams, \
       metricParams, metricParams.fUpper, covary=True)

chi1Max = vals[0].max()
chi1Min = vals[0].min()
chi1Diff = chi1Max - chi1Min
chi2Max = vals[1].max()
chi2Min = vals[1].min()
chi2Diff = chi2Max - chi2Min

logging.info("Calculating lattice")

if not opts.threed_lattice:
    v1s,v2s = pycbc.tmpltbank.generate_hexagonal_lattice(\
              chi1Max+(0.02*chi1Diff), chi1Min-(0.02*chi1Diff),\
              chi2Max+(0.02*chi2Diff), chi2Min-(0.02*chi2Diff),\
              opts.max_mismatch)
else:
    chi3Max = vals[2].max()
    chi3Min = vals[2].min()
    chi3Diff = chi3Max - chi3Min
    v1s, v2s, v3s = pycbc.tmpltbank.generate_anstar_3d_lattice(\
          chi1Max+(0.02*chi1Diff), chi1Min-(0.02*chi1Diff),\
          chi2Max+(0.02*chi2Diff), chi2Min-(0.02*chi2Diff),\
          chi3Max+(0.02*chi3Diff), chi3Min-(0.02*chi3Diff), opts.max_mismatch)
    chi3Max = vals[2].max()
    chi3Min = vals[2].min()
    chi3Diff = chi3Max - chi3Min

logging.info("Lattice contains %d points" % len(v1s))

# Now remove points that are too far from edges of parameter space

if opts.filter_points:
    logging.info("Removing lattice points too far from physical space.")
    # Create a large set of points and map to xi_i to give a starting point when
    # mapping from xi_i to masses and spins
    # Use the EM constraint only if asked to do so
    rTotmass, rEta, rBeta, rSigma, rGamma, rSpin1z, rSpin2z = \
          pycbc.tmpltbank.get_random_mass(2000000, massRangeParams)
    diff = (rTotmass*rTotmass * (1-4*rEta))**0.5
    rMass1 = (rTotmass + diff)/2.
    rMass2 = (rTotmass - diff)/2.
    rChis = (rSpin1z + rSpin2z)/2.

    rXis = pycbc.tmpltbank.get_cov_params(rTotmass, rEta, rBeta, rSigma,
               rGamma, rChis, metricParams, metricParams.fUpper)

    xis = (numpy.array(rXis)).T
    physMasses = numpy.array([rTotmass, rEta, rSpin1z, rSpin2z])
    physMasses = physMasses.T
    f0 = opts.f0
    order = opts.pn_order
    maxmass1 = opts.max_mass1
    maxmass2 = opts.max_mass2
    minmass1 = opts.min_mass1
    minmass2 = opts.min_mass2
    maxNSspin = opts.max_ns_spin_mag
    maxBHspin = opts.max_bh_spin_mag

    newV1s = []
    newV2s = []
    if opts.threed_lattice:
        newV3s = []

    # Use scipy's KDtree to quickly calculate Euclidean distances
    logging.info("Setting up KDtree to compute distances.")
    if opts.threed_lattice:
        tree = spatial.KDTree(xis[:,:3])
        xi_points = zip(v1s,v2s,v3s)
    else:
        tree = spatial.KDTree(xis[:,:2])
        xi_points = zip(v1s,v2s)

    logging.info("Computing distances using KDtree.")
    dists, pointargs = tree.query(xi_points)

    logging.info("Removing far-away points.")

    for i in xrange(len(v1s)):
        if dists[i] < 2.:
            newV1s.append(v1s[i])
            newV2s.append(v2s[i])
            if opts.threed_lattice:
                newV3s.append(v3s[i])

    logging.info("Filtered lattice contains %d points" % len(newV1s))
else:
    newV1s = v1s
    newV2s = v2s
    if opts.threed_lattice:
        newV3s = v3s

# Now begin to generate the dag
h5file = h5py.File(opts.intermediate_data_file, 'w')
# Dump the full bank in \xi_i coordinates
h5file['full_bank/v1s'] = newV1s
h5file['full_bank/v2s'] = newV2s
if opts.threed_lattice:
    h5file['full_bank/v3s'] = newV3s

# Now store split banks
bank_num = 0

logging.info("Printing split banks to HDF file.")
v1s = []
v2s = []
v3s = []
for i in xrange(len(newV1s)):
    v1s.append(newV1s[i])
    v2s.append(newV2s[i])
    if opts.threed_lattice:
        v3s.append(newV3s[i])
    if not (i+1) % opts.split_bank_num:
        h5file['split_banks/split_bank_%05d/v1s' % bank_num] = v1s
        h5file['split_banks/split_bank_%05d/v2s' % bank_num] = v2s
        v1s = []
        v2s = []
        if opts.threed_lattice:
            h5file['split_banks/split_bank_%05d/v3s' % bank_num] = v3s
            v3s = []
        bank_num = bank_num + 1

if len(v1s):
    # There are still templates to dump
    h5file['split_banks/split_bank_%05d/v1s' % bank_num] = v1s
    h5file['split_banks/split_bank_%05d/v2s' % bank_num] = v2s
    if opts.threed_lattice:
        h5file['split_banks/split_bank_%05d/v3s' % bank_num] = v3s

logging.info('Adding metric information to HDF file.')
h5file['cov_evecs'] = metricParams.evecsCV[metricParams.fUpper]
h5file['metric_evals'] = metricParams.evals[metricParams.fUpper]
h5file['metric_evecs'] = metricParams.evecs[metricParams.fUpper]

h5file.close()
  
# And begin dag generation
# First: Set up the config parser.
cp = ConfigParser.ConfigParser()
# Workflow first:
cp.add_section('workflow')
cp.set('workflow', 'file-retention-level', 'all_files')
cp.set('workflow', 'start-time', '900000000')
cp.set('workflow', 'end-time', '900010000')
cp.add_section('workflow-ifos')
cp.set('workflow-ifos', 'h1', '')
cp.set('workflow-ifos', 'l1', '')
cp.set('workflow-ifos', 'v1', '')

# Then executables
cp.add_section('executables')
cp.set('executables', 'aligned2dstack', '${which:pycbc_geom_aligned_2dstack}')
cp.set('executables', 'alignedbankcat', '${which:pycbc_aligned_bank_cat}')
cp.set('executables', 'dumptochis', '${which:pycbc_tmpltbank_to_chi_params}')
cp.set('executables', 'bankverify', '${which:pycbc_bank_verification}')

# Shared option groups
cp.add_section('sharedoptions')
cp.set('sharedoptions', 'massranges', 'aligned2dstack,dumptochis,bankverify')
cp.set('sharedoptions', 'metric', 'alignedbankcat,dumptochis,bankverify')
cp.set('sharedoptions', 'psd', 'alignedbankcat,dumptochis,bankverify')
cp.set('sharedoptions', 'data', 'alignedbankcat,dumptochis,bankverify')
cp.set('sharedoptions', 'ethinca', 'alignedbankcat')

cp.add_section('sharedoptions-massranges')
mass_range_commands = pycbc.tmpltbank.get_options_from_group(mass_range_opts)
add_commands_to_cp(mass_range_commands, orig_opts, cp,
                   'sharedoptions-massranges')
cp.add_section('sharedoptions-metric')
metric_commands = pycbc.tmpltbank.get_options_from_group(metric_opts)
add_commands_to_cp(metric_commands, orig_opts, cp, 'sharedoptions-metric')
cp.add_section('sharedoptions-psd')
psd_commands = pycbc.tmpltbank.get_options_from_group(psd_opts)
add_commands_to_cp(psd_commands, orig_opts, cp, 'sharedoptions-psd')
cp.add_section('sharedoptions-data')
data_commands = pycbc.tmpltbank.get_options_from_group(data_opts)
add_commands_to_cp(data_commands, orig_opts, cp, 'sharedoptions-data')
cp.add_section('sharedoptions-ethinca')
ethinca_commands = pycbc.tmpltbank.get_options_from_group(ethinca_opts)
add_commands_to_cp(ethinca_commands, orig_opts, cp, 'sharedoptions-ethinca')

# 2dstack
cp.add_section('aligned2dstack')
cp.set('aligned2dstack', 'pn-order',opts.pn_order)
cp.set('aligned2dstack', 'f0', str(opts.f0))
cp.set('aligned2dstack', 'min-match', str(opts.min_match))
cp.set('aligned2dstack', 'stack-distance', str(opts.stack_distance))
if opts.threed_lattice:
    cp.set('aligned2dstack', 'threed-lattice', '')
if opts.random_seed:
    cp.set('aligned2dstack', 'random-seed', str(opts.random_seed))
cp.add_section('pegasus_profile-aligned2dstack')
cp.set('pegasus_profile-aligned2dstack', 'condor|request_memory', '3000')

# alignedcat
cp.add_section('alignedbankcat')
if opts.f_low_column is not None:
    cp.set('alignedbankcat', 'f-low-column', opts.f_low_column)
cp.add_section('pegasus_profile-alignedbankcat')
cp.set('pegasus_profile-alignedbankcat', 'condor|request_memory', '3000')

# dumptochis
cp.add_section('dumptochis')
cp.add_section('pegasus_profile-dumptochis')
cp.set('pegasus_profile-dumptochis', 'condor|request_memory', '3000')

# bank verification
cp.add_section('bankverify')
cp.set('bankverify', 'bin-spacing', str(1 - opts.min_match))
cp.add_section('pegasus_profile-bankverify')
cp.set('pegasus_profile-bankverify', 'condor|request_memory', '3000')

temp_fp = open('temp.ini', 'w')
cp.write(temp_fp)
temp_fp.close()

opts.config_files=['temp.ini']
opts.config_overrides=[]

# Now setup the workflow
workflow = wf.Workflow(opts, opts.workflow_name)

# Start with 2dstack jobs
if opts.storage_path_base:
    curr_dir = os.path.join(opts.storage_path_base, 'stack2d')
else:
    curr_dir = 'stack2d'
#wf.makedir(curr_dir)

stack_exe = GeomAligned2DStackExecutable(workflow.cp, 'aligned2dstack',
                                   ifos=workflow.ifos, out_dir=curr_dir)
num_banks = int((len(newV1s) - 0.5)//opts.split_bank_num) + 1
if not opts.is_sub_workflow:
    input_h5file = wf.File.from_path(opts.intermediate_data_file)
else:
    # Basically, if this is a sub-workflow do not store a PFN, as it will be
    # some meaningless temporary path, but instead tell the uber-workflow
    # that these files are outputs and used within the sub-dax and let it
    # handle the rest.
    input_h5file = pwf.File(os.path.basename(opts.intermediate_data_file))


all_outs = wf.FileList([])

for idx in xrange(num_banks):
    stackid = 'job%05d' %(idx)
    stack_node = stack_exe.create_node(input_h5file, idx,
                                       workflow.analysis_time,
                                       extra_tags=[stackid])
    workflow += stack_node
    stack_outs = stack_node.output_files
    assert(len(stack_outs) == 1)
    all_outs.append(stack_outs[0])

# Then combine everything together
if opts.storage_path_base:
    curr_dir = opts.storage_path_base
else:
    curr_dir = '.'

combine_exe = AlignedBankCatExecutable(workflow.cp, 'alignedbankcat',
                                        ifos=workflow.ifos, out_dir=curr_dir)
if not opts.is_sub_workflow:
    metadata_file = wf.File.from_path(cmds_file_name)
else:
    metadata_file = pwf.File(os.path.basename(cmds_file_name))

combine_node = combine_exe.create_node(all_outs, metadata_file,
                                       workflow.analysis_time,
                                       output_file_path=opts.output_file)
workflow += combine_node
assert(len(combine_node.output_files) == 1)
out_bank = combine_node.output_files[0]

# And do the convert to chis job
chiparams_exe = TmpltbankToChiParams(workflow.cp, 'dumptochis',
                                      ifos=workflow.ifos, out_dir=curr_dir)
chiparams_node = chiparams_exe.create_node(out_bank, workflow.analysis_time)
workflow += chiparams_node

# And the bank verification job
bankverify_exe = BankVerification(workflow.cp, 'bankverify',
                                  ifos=workflow.ifos, out_dir=curr_dir)
bankverify_node = bankverify_exe.create_node(out_bank, workflow.analysis_time)
workflow += bankverify_node

workflow.save(filename=opts.dax_filename, output_map=opts.map_filename)

logging.info("GENERATION SUCCESSFUL.")
logging.info("Submit the resulting dax file to generate your template bank.")
logging.info("See pycbc_submit_dax --help for instructions.")
back to top