Revision ba0dc5ddaad56dea2d6207a14b2087832e8d2211 authored by Collin Capano on 16 September 2017, 22:10:52 UTC, committed by Soumi De on 16 September 2017, 22:10:52 UTC
* first pass at adding CustomTransform class * add functions * add from config, remove inverse for now * add waveform transforms to likelihood * add waveform transforms to pycbc_inference * fix bugs * fix more bugs * fix doc
1 parent 5c16927
pycbc_create_sbank_workflow
#!/usr/bin/env python
# Copyright (C) 2016 Ian W. Harry, Y Ddraig Goch
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Workflow generator for the lalapps_cbc_sbank template bank generation.
This is intended to be standalone, without putting things like the
SbankExecutable class in the pycbc.workflow module, to give an illustration of
how a simple workflow is constructed with pycbc.workflow.
"""
#imports
from __future__ import division
import os
import argparse
import pycbc
import pycbc.version
import pycbc.workflow as wf
import pycbc.workflow.pegasus_workflow as pwf
# Boiler-plate stuff
__author__ = "Ian Harry <ian.harry@ligo.org>"
__version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_create_sbank_workflow"
# We define classes for all executables used in the workflow
class SbankExecutable(wf.Executable):
""" Class for running lalapps_cbc_sbank
"""
# This can be altered if you don't always want to store output files
current_retention_level = wf.Executable.FINAL_RESULT
# This tells us that reference-psd is a file option
file_input_options = ['--reference-psd']
sbank_job_seed = 0
def create_node(self, analysis_time, seed_bank=None, trial_bank=None,
mchirp_boundaries_file=None, mchirp_boundary_idx=None,
extra_tags=None):
if extra_tags is None:
extra_tags = []
node = wf.Executable.create_node(self)
# Most options are specified in the config file. In some cases though,
# for example input/output files, options are specified directly in
# the create_node function. *DO NOT* specify these in the config file.
# The seed must be unique for each job and reproducible
node.add_opt('--seed', str(self.sbank_job_seed))
SbankExecutable.sbank_job_seed += 1
# These input files are optional. If given, add them
if seed_bank is not None:
node.add_input_opt('--bank-seed', seed_bank)
if trial_bank is not None:
node.add_input_opt('--trial-waveforms', trial_bank)
if mchirp_boundaries_file is not None:
node.add_input_opt('--mchirp-boundaries-file',
mchirp_boundaries_file)
# The boundaries file option also requires the boundary idx
assert(mchirp_boundary_idx is not None)
node.add_opt('--mchirp-boundaries-index', mchirp_boundary_idx)
# Here we add the output file, but we are letting pycbc.workflow
# handle how to name the file
node.new_output_file_opt(analysis_time, '.h5',
'--output-filename', tags=self.tags + extra_tags)
return node
class SbankChooseMchirpBinsExecutable(wf.Executable):
""" Class for running lalapps_cbc_sbank_choose_mchirp_boundaries
"""
current_retention_level = wf.Executable.ALL_TRIGGERS
def create_node(self, analysis_time, input_file, nbanks):
node = wf.Executable.create_node(self)
# Here we add the output file
node.new_output_file_opt(analysis_time, '.txt',
'--output-file', tags=self.tags)
# And the input file, which is an argument, not an option
node.add_input_arg(input_file)
# nbanks is just a normal option, but as it affects the workflow
# structure, it is supplied here and not directly in the config file
node.add_opt('--nbanks', nbanks)
return node
# There is already a ligolw_add executable (wf.LigolwAddExecutable), this needs
# a minor change because we are potentially dealing with sub-daxes here.
class LigolwAddExecutable(wf.LigolwAddExecutable):
def create_node(self, jobSegment, input_files, output=None,
use_tmp_subdirs=True, tags=None):
if output is not None:
# Convert path to file
out_file = wf.File.from_path(output)
if self.retain_files:
if not os.path.isabs(output):
out_file.storage_path = os.path.join(self.out_dir,
output)
else:
out_file.storage_path = output
else:
out_file = output
return super(LigolwAddExecutable, self).create_node\
(jobSegment, input_files, output=out_file,
use_tmp_subdirs=use_tmp_subdirs, tags=tags)
class CombineHDFBanksExecutable(wf.Executable):
""" Class for running a combination of hdf banks
"""
current_retention_level = wf.Executable.ALL_TRIGGERS
def create_node(self, analysis_time, input_file_list, output=None,
tags=None):
node = wf.Executable.create_node(self)
# Here we add the input files
node.add_input_list_opt('--input-filenames', input_file_list)
curr_tags = self.tags
if tags is not None:
curr_tags += tags
# Output file
if output is not None:
# Convert path to file
out_file = wf.File.from_path(output)
if self.retain_files:
if not os.path.isabs(output):
out_file.storage_path = os.path.join(self.out_dir,
output)
else:
out_file.storage_path = output
node.add_output_opt('--output-file', out_file)
else:
node.new_output_file_opt(analysis_time, '.h5',
'--output-file', tags=curr_tags)
return node
##############################################################################
# Argument parsing and setup of workflow #
##############################################################################
# Use the standard workflow command-line parsing routines. Things like a
# configuration file are specified within the "workflow command line group"
# so run this with --help to see what options are added.
_desc = __doc__[1:]
parser = argparse.ArgumentParser(description=_desc)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument("--workflow-name", type=str, default='sbank_workflow',
help="Descriptive name of the analysis.")
parser.add_argument("-d", "--output-dir", default=None,
help="Path to output directory.")
parser.add_argument("--output-file", type=str, default=None,
help="Specify the output file name. Either a name can be "
"provided or a full path to file. Is this is not "
"given a filename and location is chosen "
"automatically.")
parser.add_argument("--dax-filename", type=str, default=None,
help="This can be used if running this job in a "
"sub-workflow to specify the dax filename.")
parser.add_argument("--map-filename", type=str, default=None,
help="This can be used if running this job in a "
"sub-workflow to specify the output map filename. "
"WARNING: Giving this if not running as a "
"sub-workflow will cause pycbc_submit_dax to not "
"work.")
parser.add_argument("--is-sub-workflow", default=False, action="store_true",
help="Only give this option if this code is being run "
"as a sub-workflow within pegasus. If this means nothing "
"to you, do not give this option.")
parser.add_argument("--tags", default=[], nargs="*", action="store",
help="If this option is given all jobs, and all workflow "
"configuration options, will use the tags given "
"here. This can be used if running this as a "
"sub-workflow to give two sets of options to two "
"different invocations of the sbank workflow.")
wf.add_workflow_command_line_group(parser)
args = parser.parse_args()
# Create the workflow object
workflow = wf.Workflow(args, args.workflow_name)
if not args.is_sub_workflow:
wf.makedir(args.output_dir)
os.chdir(args.output_dir)
args.output_dir = '.'
##############################################################################
# Do I have a seed bank provided and will this be used for chirp mass #
# boundaries? If not then add the COARSE job. #
##############################################################################
seed_file = None
bins_inp_file = None
if workflow.cp.has_option_tags('workflow', 'seed-bank', args.tags):
# If a seed bank is provided register it as a File object
seed_banks = workflow.cp.get_opt_tags('workflow', 'seed-bank', args.tags)
seed_banks = seed_banks.split(' ')
if len(seed_banks) == 0:
raise ValueError("No seed bank actually provided!")
seed_files = []
for seed_bank in seed_banks:
if not args.is_sub_workflow:
seed_file = wf.File.from_path(seed_bank)
else:
seed_file = pwf.File(os.path.basename(seed_bank))
seed_files.append(seed_file)
if len(seed_files) == 1:
seed_file = seed_files[0]
else:
# Combine with h5add
out_dir = os.path.join(args.output_dir, 'input_combine')
h5add_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
ifos=['H1L1V1'],
out_dir=out_dir,
tags=['INPUT'] + args.tags)
h5add_exe.update_current_retention_level(wf.Executable.ALL_TRIGGERS)
h5add_node = h5add_exe.create_node(workflow.analysis_time,
seed_files)
workflow += h5add_node
assert(len(h5add_node.output_files) == 1)
seed_file = h5add_node.output_files[0]
# bins_inp_file will go to the mchirp_bins generator. seed file will go to
# the first set of sbank jobs if not None.
bins_inp_file = seed_file
if not workflow.cp.has_option_tags('workflow', 'use-seed-bank-for-chirp-bins',
args.tags):
out_dir = os.path.join(args.output_dir, 'coarse')
# Generate Executable class (similar to Job in the old terminology)
# The tags=coarse option is used to ensure that options in the
# ['sbank-coarse']section of the ini file are sent to this job, and *only*
# this job
coarse_sbank_exe = SbankExecutable(workflow.cp, 'sbank',
ifos=workflow.ifos,
out_dir=out_dir,
tags=['coarse']+args.tags)
coarse_sbank_exe.update_current_retention_level\
(wf.Executable.MERGED_TRIGGERS)
# Then make a specific node
coarse_node = coarse_sbank_exe.create_node(workflow.analysis_time)
# Add to workflow
workflow += coarse_node
# And record output file, as it will be needed later
assert(len(coarse_node.output_files) == 1)
bins_inp_file = coarse_node.output_files[0]
if seed_file is None:
if not workflow.cp.has_option_tags('workflow',
'do-not-use-coarse-job-as-seed',
args.tags):
seed_file = bins_inp_file
if bins_inp_file is None:
# Only get here if weird options are given
err_msg = 'You have not given a seed bank but have asked to use the seed '
err_msg += 'bank for generating the chirp mass bins. This is not possible.'
raise ValueError(err_msg)
##############################################################################
# Begin the parallelization loops #
##############################################################################
# How many repetitions to try? Get this from config-parser. Special
# config-parser options like this go in the [workflow] section
num_cycles = int(workflow.cp.get_opt_tags('workflow', 'num-cycles', args.tags))
# Create executables up front to make plots nicer in the dashboard
bins_exe = SbankChooseMchirpBinsExecutable(workflow.cp, 'sbank_mchirp_bins',
ifos=workflow.ifos, tags=args.tags)
main_sbank_exe = SbankExecutable(workflow.cp, 'sbank', ifos=workflow.ifos,
tags=['parallel'] + args.tags)
h5add_first_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
ifos=['H1L1V1'],
tags=['FIRST'] + args.tags)
readder_sbank_exe = SbankExecutable(workflow.cp, 'sbank',
ifos=workflow.ifos,
tags=['readder'] + args.tags)
h5add_final_exe = CombineHDFBanksExecutable(workflow.cp, 'h5add',
ifos=['H1L1V1'],
tags=['FINAL'] + args.tags)
for cycle_idx in range(num_cycles):
#########
# SETUP #
#########
cycle_tag = 'cycle%d' %(cycle_idx)
out_dir = os.path.join(args.output_dir, cycle_tag)
# How many banks to use? This can vary cycle to cycle, or be the same for
# all. Either supply it once in [workflow], or in [workflow-cycleN] for
# N in range(num_cycles)
nbanks = workflow.cp.get_opt_tags('workflow', 'nbanks',
tags=[cycle_tag]+args.tags)
nbanks = int(nbanks)
#############
# MASS BINS #
#############
bins_exe.update_current_tags([cycle_tag] + args.tags)
bins_exe.update_output_directory(out_dir=out_dir)
bins_node = bins_exe.create_node(workflow.analysis_time,
bins_inp_file, nbanks)
workflow += bins_node
assert(len(bins_node.output_files) == 1)
bins_out_file = bins_node.output_files[0]
#######################
# PARALELLIZED SBANKS #
#######################
main_sbank_exe.update_current_tags(['parallel', cycle_tag] + args.tags)
main_sbank_exe.update_output_directory(out_dir=out_dir)
# These jobs we don't always want to store
main_sbank_exe.update_current_retention_level\
(wf.Executable.INTERMEDIATE_PRODUCT)
main_sbank_files = wf.FileList([])
for nbank_idx in range(nbanks):
nbank_tag = 'nbank%d' %(nbank_idx)
main_sbank_node = (main_sbank_exe.create_node\
(workflow.analysis_time, seed_bank=seed_file,
mchirp_boundaries_file=bins_out_file,
mchirp_boundary_idx=nbank_idx,
extra_tags=[nbank_tag]))
workflow += main_sbank_node
assert(len(main_sbank_node.output_files) == 1)
main_sbank_files += main_sbank_node.output_files
############
# COMBINER #
############
h5add_first_exe.update_current_tags([cycle_tag, 'FIRST'] + args.tags)
h5add_first_exe.update_output_directory(out_dir=out_dir)
h5add_first_exe.update_current_retention_level(wf.Executable.ALL_TRIGGERS)
h5add_node = h5add_first_exe.create_node(workflow.analysis_time,
main_sbank_files)
workflow += h5add_node
assert(len(h5add_node.output_files) == 1)
h5add_out = h5add_node.output_files[0]
###########
# READDER #
###########
readder_sbank_exe.update_current_tags([cycle_tag, 'readder'] + args.tags)
readder_sbank_exe.update_output_directory(out_dir=out_dir)
readder_sbank_exe.update_current_retention_level\
(wf.Executable.ALL_TRIGGERS)
readder_sbank_node = readder_sbank_exe.create_node(workflow.analysis_time,
trial_bank=h5add_out)
workflow += readder_sbank_node
assert(len(readder_sbank_node.output_files) == 1)
readder_out = readder_sbank_node.output_files[0]
#################
# FINAL COMBINE #
#################
# Is this the final output file?
if cycle_idx == (num_cycles - 1):
out_dir = args.output_dir
crl = wf.Executable.FINAL_RESULT
output_path = args.output_file
else:
crl = wf.Executable.MERGED_TRIGGERS
output_path = None
h5add_final_exe.update_current_tags([cycle_tag, 'FINAL'] + args.tags)
h5add_final_exe.update_output_directory(out_dir)
h5add_final_exe.update_current_retention_level(crl)
if seed_file is None:
inputs = [readder_out]
else:
inputs = [seed_file, readder_out]
h5add_node = h5add_final_exe.create_node(workflow.analysis_time,
inputs, output=output_path)
workflow += h5add_node
assert(len(h5add_node.output_files) == 1)
# This becomes the input file for the next loop if going again
seed_file = h5add_node.output_files[0]
bins_inp_file = seed_file
workflow.save(filename=args.dax_filename, output_map_path=args.map_filename)
Computing file changes ...