https://github.com/gwastro/pycbc
Tip revision: 941838438f3ad527f32604481a8da41207cd091d authored by Ian Harry on 12 October 2023, 13:09:30 UTC
Prepare for release (#4526)
Prepare for release (#4526)
Tip revision: 9418384
grb_utils.py
# Copyright (C) 2015 Andrew Williamson
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This library code contains functions and classes that are used in the
generation of pygrb workflows. For details about pycbc.workflow see here:
http://pycbc.org/pycbc/latest/html/workflow.html
"""
import glob
import os
import numpy as np
from scipy.stats import rayleigh
from gwdatafind.utils import filename_metadata
from pycbc import makedir
from pycbc.workflow.core import \
File, FileList, configparser_value_to_file, resolve_url_to_file,\
Executable, Node
from pycbc.workflow.jobsetup import select_generic_executable
from pycbc.workflow.pegasus_workflow import SubWorkflow
from pycbc.workflow.plotting import PlotExecutable
def _select_grb_pp_class(wflow, curr_exe):
"""
This function returns the class for PyGRB post-processing scripts.
Parameters
----------
curr_exe : string
The name of the executable
Returns
-------
exe_class : Sub-class of pycbc.workflow.core.Executable that holds utility
functions appropriate for the given executable. Instances of the class
('jobs') **must** have methods
* job.create_node()
and
* job.get_valid_times(ifo, )
"""
exe_path = wflow.cp.get('executables', curr_exe)
exe_name = os.path.basename(exe_path)
exe_to_class_map = {
'pycbc_grb_trig_combiner': PycbcGrbTrigCombinerExecutable,
'pycbc_grb_trig_cluster': PycbcGrbTrigClusterExecutable,
'pycbc_grb_inj_finder': PycbcGrbInjFinderExecutable,
'pycbc_grb_inj_combiner': PycbcGrbInjCombinerExecutable
}
if exe_name not in exe_to_class_map:
raise ValueError(f"No job class exists for executable {curr_exe}")
return exe_to_class_map[exe_name]
def set_grb_start_end(cp, start, end):
"""
Function to update analysis boundaries as workflow is generated
Parameters
----------
cp : pycbc.workflow.configuration.WorkflowConfigParser object
The parsed configuration options of a pycbc.workflow.core.Workflow.
start : int
The start of the workflow analysis time.
end : int
The end of the workflow analysis time.
Returns
--------
cp : pycbc.workflow.configuration.WorkflowConfigParser object
The modified WorkflowConfigParser object.
"""
cp.set("workflow", "start-time", str(start))
cp.set("workflow", "end-time", str(end))
return cp
def make_gating_node(workflow, datafind_files, outdir=None, tags=None):
'''
Generate jobs for autogating the data for PyGRB runs.
Parameters
----------
workflow: pycbc.workflow.core.Workflow
An instanced class that manages the constructed workflow.
datafind_files : pycbc.workflow.core.FileList
A FileList containing the frame files to be gated.
outdir : string
Path of the output directory
tags : list of strings
If given these tags are used to uniquely name and identify output files
that would be produced in multiple calls to this function.
Returns
--------
condition_strain_nodes : list
List containing the pycbc.workflow.core.Node objects representing the
autogating jobs.
condition_strain_outs : pycbc.workflow.core.FileList
FileList containing the pycbc.workflow.core.File objects representing
the gated frame files.
'''
cp = workflow.cp
if tags is None:
tags = []
condition_strain_class = select_generic_executable(workflow,
"condition_strain")
condition_strain_nodes = []
condition_strain_outs = FileList([])
for ifo in workflow.ifos:
input_files = FileList([datafind_file for datafind_file in \
datafind_files if datafind_file.ifo == ifo])
condition_strain_jobs = condition_strain_class(cp, "condition_strain",
ifos=ifo, out_dir=outdir, tags=tags)
condition_strain_node, condition_strain_out = \
condition_strain_jobs.create_node(input_files, tags=tags)
condition_strain_nodes.append(condition_strain_node)
condition_strain_outs.extend(FileList([condition_strain_out]))
return condition_strain_nodes, condition_strain_outs
def fermi_core_tail_model(
sky_err, rad, core_frac=0.98, core_sigma=3.6, tail_sigma=29.6):
"""Fermi systematic error model following
https://arxiv.org/abs/1909.03006, with default values valid
before 11 September 2019.
Parameters
----------
core_frac : float
Fraction of the systematic uncertainty contained within the core
component.
core_sigma : float
Size of the GBM systematic core component.
tail_sigma : float
Size of the GBM systematic tail component.
Returns
_______
tuple
Tuple containing the core and tail probability distributions
as a function of radius.
"""
scaledsq = sky_err**2 / -2 / np.log(0.32)
return (
frac * (1 - np.exp(-0.5 * (rad / np.sqrt(scaledsq + sigma**2))**2))
for frac, sigma
in zip([core_frac, 1 - core_frac], [core_sigma, tail_sigma]))
def get_sky_grid_scale(
sky_error=0.0, containment=0.9, upscale=False, fermi_sys=False,
precision=1e-3, **kwargs):
"""
Calculate the angular radius corresponding to a desired
localization uncertainty level. This is used to generate the search
grid and involves scaling up the standard 1-sigma value provided to
the workflow, assuming a normal probability profile. Fermi
systematic errors can be included, following
https://arxiv.org/abs/1909.03006, with default values valid before
11 September 2019. The default probability coverage is 90%.
Parameters
----------
sky_error : float
The reported statistical 1-sigma sky error of the trigger.
containment : float
The desired localization probability to be covered by the sky
grid.
upscale : bool, optional
Whether to apply rescale to convert from 1 sigma -> containment
for non-Fermi triggers. Default = True as Swift reports 90%
radius directly.
fermi_sys : bool, optional
Whether to apply Fermi-GBM systematics via
``fermi_core_tail_model``. Default = False.
precision : float, optional
Precision (in degrees) for calculating the error radius via
Fermi-GBM model.
**kwargs
Additional keyword arguments passed to `fermi_core_tail_model`.
Returns
_______
float
Sky error radius in degrees.
"""
if fermi_sys:
lims = (0.5, 4)
radii = np.linspace(
lims[0] * sky_error, lims[1] * sky_error,
int((lims[1] - lims[0]) * sky_error / precision) + 1)
core, tail = fermi_core_tail_model(sky_error, radii, **kwargs)
out = radii[(abs(core + tail - containment)).argmin()]
else:
# Use Rayleigh distribution to go from 1 sigma containment to
# containment given by function variable. Interval method returns
# bounds of equal probability about the median, but we want 1-sided
# bound, hence use (2 * containment - 1)
out = sky_error
if upscale:
out *= rayleigh.interval(2 * containment - 1)[-1]
return out
def setup_pygrb_pp_workflow(wf, pp_dir, seg_dir, segment, insp_files,
inj_files, inj_insp_files, inj_tags):
"""
Generate post-processing section of PyGRB offline workflow
"""
pp_outs = FileList([])
# pp_outs is returned by this function. It is structured as follows:
# pp_outs[0]: [ALL_TIMES, ONSOURCE, OFFSOURCE, OFFTRIAL_1, ..., OFFTRIAL_N]
# FileList (N can be set by the user and is 6 by default)
# pp_outs[1]: ALL_TIMES_CLUSTERED File
# pp_outs[2]: OFFSOURCE_CLUSTERED File
# pp_outs[3]: ONSOURCE_CLUSTERED File
# pp_outs[4]: OFFTRIAL_1_CLUSTERED File
# ...
# pp_outs[4+N]: OFFTRIAL_N_CLUSTERED File
# pp_outs[-2]: FOUNDMISSED FileList covering all injection sets
# pp_outs[-1]: FOUNDMISSED-FILTERED FileList covering all injection sets
# in the same order as pp_outs[-2]
# Begin setting up trig combiner job(s)
# Select executable class and initialize
exe_class = _select_grb_pp_class(wf, "trig_combiner")
job_instance = exe_class(wf.cp, "trig_combiner")
# Create node for coherent no injections jobs
node, trig_files = job_instance.create_node(wf.ifos, seg_dir, segment,
insp_files, pp_dir)
wf.add_node(node)
pp_outs.append(trig_files)
# Trig clustering for each trig file
exe_class = _select_grb_pp_class(wf, "trig_cluster")
job_instance = exe_class(wf.cp, "trig_cluster")
for trig_file in trig_files:
# Create and add nodes
node, out_file = job_instance.create_node(trig_file, pp_dir)
wf.add_node(node)
pp_outs.append(out_file)
# Find injections from triggers
exe_class = _select_grb_pp_class(wf, "inj_finder")
job_instance = exe_class(wf.cp, "inj_finder")
inj_find_files = FileList([])
for inj_tag in inj_tags:
tag_inj_files = FileList([f for f in inj_files
if inj_tag in f.tags])
# The here stems from the injection group information
# being stored in the second tag. This could be improved
# depending on the final implementation of injections
tag_insp_files = FileList([f for f in inj_insp_files
if inj_tag in f.tags[1]])
node, inj_find_file = job_instance.create_node(
tag_inj_files, tag_insp_files,
pp_dir)
wf.add_node(node)
inj_find_files.append(inj_find_file)
pp_outs.append(inj_find_files)
# Combine injections
exe_class = _select_grb_pp_class(wf, "inj_combiner")
job_instance = exe_class(wf.cp, "inj_combiner")
inj_comb_files = FileList([])
for in_file in inj_find_files:
if 'DETECTION' not in in_file.tags:
node, inj_comb_file = job_instance.create_node(in_file,
pp_dir,
in_file.tags,
segment)
wf.add_node(node)
inj_comb_files.append(inj_comb_file)
pp_outs.append(inj_comb_files)
return pp_outs
class PycbcGrbTrigCombinerExecutable(Executable):
""" The class responsible for creating jobs
for ''pycbc_grb_trig_combiner''.
"""
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, name):
super().__init__(cp=cp, name=name)
self.trigger_name = cp.get('workflow', 'trigger-name')
self.trig_start_time = cp.get('workflow', 'start-time')
self.num_trials = int(cp.get('trig_combiner', 'num-trials'))
def create_node(self, ifo_tag, seg_dir, segment, insp_files,
out_dir, tags=None):
node = Node(self)
node.add_opt('--verbose')
node.add_opt("--ifo-tag", ifo_tag)
node.add_opt("--grb-name", self.trigger_name)
node.add_opt("--trig-start-time", self.trig_start_time)
node.add_opt("--segment-dir", seg_dir)
node.add_input_list_opt("--input-files", insp_files)
node.add_opt("--user-tag", "PYGRB")
node.add_opt("--num-trials", self.num_trials)
# Prepare output file tag
user_tag = f"PYGRB_GRB{self.trigger_name}"
if tags:
user_tag += "_{}".format(tags)
# Add on/off source and off trial outputs
output_files = FileList([])
outfile_types = ['ALL_TIMES', 'OFFSOURCE', 'ONSOURCE']
for i in range(self.num_trials):
outfile_types.append("OFFTRIAL_{}".format(i+1))
for out_type in outfile_types:
out_name = "{}-{}_{}-{}-{}.h5".format(
ifo_tag, user_tag, out_type,
segment[0], segment[1]-segment[0])
out_file = File(ifo_tag, 'trig_combiner', segment,
file_url=os.path.join(out_dir, out_name))
node.add_output(out_file)
output_files.append(out_file)
return node, output_files
class PycbcGrbTrigClusterExecutable(Executable):
""" The class responsible for creating jobs
for ''pycbc_grb_trig_cluster''.
"""
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, name):
super().__init__(cp=cp, name=name)
def create_node(self, in_file, out_dir):
node = Node(self)
node.add_input_opt("--trig-file", in_file)
# Determine output file name
ifotag, filetag, segment = filename_metadata(in_file.name)
start, end = segment
out_name = "{}-{}_CLUSTERED-{}-{}.h5".format(ifotag, filetag,
start, end-start)
out_file = File(ifotag, 'trig_cluster', segment,
file_url=os.path.join(out_dir, out_name))
node.add_output(out_file)
return node, out_file
class PycbcGrbInjFinderExecutable(Executable):
"""The class responsible for creating jobs for ``pycbc_grb_inj_finder``
"""
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, exe_name):
super().__init__(cp=cp, name=exe_name)
def create_node(self, inj_files, inj_insp_files,
out_dir, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_list_opt('--input-files', inj_insp_files)
node.add_input_list_opt('--inj-files', inj_files)
ifo_tag, desc, segment = filename_metadata(inj_files[0].name)
desc = '_'.join(desc.split('_')[:-1])
out_name = "{}-{}_FOUNDMISSED-{}-{}.h5".format(
ifo_tag, desc, segment[0], abs(segment))
out_file = File(ifo_tag, 'inj_finder', segment,
os.path.join(out_dir, out_name), tags=tags)
node.add_output(out_file)
return node, out_file
class PycbcGrbInjCombinerExecutable(Executable):
"""The class responsible for creating jobs ``pycbc_grb_inj_combiner``
"""
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, exe_name):
super().__init__(cp=cp, name=exe_name)
def create_node(self, input_file, out_dir, ifo_tag, segment, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--input-files', input_file)
out_name = input_file.name.replace('.h5', '-FILTERED.h5')
out_file = File(ifo_tag, 'inj_combiner', segment,
os.path.join(out_dir, out_name), tags=tags)
node.add_output_opt('--output-file', out_file)
return node, out_file
def build_veto_filelist(workflow):
"""Construct a FileList instance containing all veto xml files"""
veto_dir = workflow.cp.get('workflow', 'veto-directory')
veto_files = glob.glob(veto_dir + '/*CAT*.xml')
veto_files = [resolve_url_to_file(vf) for vf in veto_files]
veto_files = FileList(veto_files)
return veto_files
def build_segment_filelist(workflow):
"""Construct a FileList instance containing all segments txt files"""
seg_dir = workflow.cp.get('workflow', 'segment-dir')
file_names = ["bufferSeg.txt", "offSourceSeg.txt", "onSourceSeg.txt"]
seg_files = [os.path.join(seg_dir, fn) for fn in file_names]
seg_files = [resolve_url_to_file(sf) for sf in seg_files]
seg_files = FileList(seg_files)
return seg_files
def make_pygrb_plot(workflow, exec_name, out_dir,
ifo=None, inj_file=None, trig_file=None, tags=None):
"""Adds a node for a plot of PyGRB results to the workflow"""
tags = [] if tags is None else tags
# Initialize job node with its tags
grb_name = workflow.cp.get('workflow', 'trigger-name')
extra_tags = ['GRB'+grb_name]
# TODO: why is inj_set repeated twice in output files?
# if inj_set is not None:
# extra_tags.append(inj_set)
if ifo:
extra_tags.append(ifo)
node = PlotExecutable(workflow.cp, exec_name, ifos=workflow.ifos,
out_dir=out_dir,
tags=tags+extra_tags).create_node()
# Pass the trigger file as an input File instance
# if exec_name in ['pygrb_plot_chisq_veto', 'pygrb_plot_coh_ifosnr',
# 'pygrb_plot_null_stats', 'pygrb_plot_skygrid',
# 'pygrb_plot_snr_timeseries']:
if trig_file is not None:
node.add_input_opt('--trig-file', resolve_url_to_file(trig_file))
# Pass the veto and segment files and options
if workflow.cp.has_option('workflow', 'veto-category'):
node.add_opt('--veto-category',
workflow.cp.get('workflow', 'veto-category'))
# FIXME: move to next if within previous one and else Raise error?
if workflow.cp.has_option('workflow', 'veto-files'):
veto_files = build_veto_filelist(workflow)
node.add_input_list_opt('--veto-files', veto_files)
# TODO: check this for pygrb_efficiency and pygrb_plot_stats_distribution
# They originally wanted seg_files
if exec_name in ['pygrb_plot_injs_results', 'pygrb_efficiency',
'pygrb_plot_snr_timeseries',
'pygrb_plot_stats_distribution']:
trig_time = workflow.cp.get('workflow', 'trigger-time')
node.add_opt('--trigger-time', trig_time)
# Other shared tuning values
if exec_name not in ['pygrb_plot_skygrid', 'pygrb_plot_coh_ifosnr']:
if not (exec_name == 'pygrb_plot_snr_timeseries' and
tags[0] != 'reweighted'):
for opt in ['chisq-index', 'chisq-nhigh', 'null-snr-threshold',
'snr-threshold', 'newsnr-threshold',
'sngl-snr-threshold', 'null-grad-thresh',
'null-grad-val']:
if workflow.cp.has_option('workflow', opt):
node.add_opt('--'+opt, workflow.cp.get('workflow', opt))
# Pass the injection file as an input File instance
if inj_file is not None and exec_name not in \
['pygrb_plot_skygrid', 'pygrb_plot_stats_distribution']:
fm_file = resolve_url_to_file(inj_file)
node.add_input_opt('--found-missed-file', fm_file)
# IFO option
if ifo:
node.add_opt('--ifo', ifo)
# Additional input files (passed as File instances)
# if exec_name in ['pygrb_plot_injs_results', 'pygrb_efficiency']:
# missed_file = inj_file
# node.add_input_opt('--missed-file', missed_file)
# FIXME: need found-missed-file option
# Output files and final input file (passed as a File instance)
if exec_name == 'pygrb_efficiency':
# In this case tags[0] is the offtrial number
onsource_file = configparser_value_to_file(workflow.cp,
'workflow', 'onsource-file')
node.add_input_opt('--onsource-file', onsource_file)
node.new_output_file_opt(workflow.analysis_time, '.png',
'--background-output-file',
tags=extra_tags+['max_background'])
node.new_output_file_opt(workflow.analysis_time, '.png',
'--onsource-output-file',
tags=extra_tags+['onsource'])
else:
node.new_output_file_opt(workflow.analysis_time, '.png',
'--output-file', tags=extra_tags)
if exec_name in ['pygrb_plot_coh_ifosnr', 'pygrb_plot_null_stats'] \
and 'zoomin' in tags:
node.add_opt('--zoom-in')
# Quantity to be displayed on the y-axis of the plot
if exec_name in ['pygrb_plot_chisq_veto', 'pygrb_plot_null_stats',
'pygrb_plot_snr_timeseries']:
node.add_opt('--y-variable', tags[0])
# Quantity to be displayed on the x-axis of the plot
elif exec_name == 'pygrb_plot_stats_distribution':
node.add_opt('--x-variable', tags[0])
elif exec_name == 'pygrb_plot_injs_results':
# Variables to plot on x and y axes
node.add_opt('--y-variable', tags[0])
node.add_opt('--x-variable', tags[1])
# Flag to plot found over missed or missed over found
if tags[2] == 'missed-on-top':
node.add_opt('--'+tags[2])
# Enable log axes
subsection = '_'.join(tags[0:2])
for log_flag in ['x-log', 'y-log']:
if workflow.cp.has_option_tags(exec_name, log_flag,
tags=[subsection]):
node.add_opt('--'+log_flag)
# Add job node to workflow
workflow += node
return node, node.output_files
def make_info_table(workflow, out_dir, tags=None):
"""Setup a job to create an html snippet with the GRB trigger information.
"""
tags = [] if tags is None else tags
# Exectuable
exec_name = 'pygrb_grb_info_table'
# Initialize job node
grb_name = workflow.cp.get('workflow', 'trigger-name')
extra_tags = ['GRB'+grb_name, 'INFO_TABLE']
node = PlotExecutable(workflow.cp, exec_name,
ifos=workflow.ifos, out_dir=out_dir,
tags=tags+extra_tags).create_node()
# Options
node.add_opt('--trigger-time', workflow.cp.get('workflow', 'trigger-time'))
node.add_opt('--ra', workflow.cp.get('workflow', 'ra'))
node.add_opt('--dec', workflow.cp.get('workflow', 'dec'))
node.add_opt('--sky-error', workflow.cp.get('workflow', 'sky-error'))
node.add_opt('--ifos', ' '.join(workflow.ifos))
node.new_output_file_opt(workflow.analysis_time, '.html',
'--output-file', tags=extra_tags)
# Add job node to workflow
workflow += node
return node, node.output_files
def make_pygrb_injs_tables(workflow, out_dir, # exclude=None, require=None,
inj_set=None, tags=None):
"""Adds a PyGRB job to make quiet-found and missed-found injection tables.
"""
tags = [] if tags is None else tags
# Exectuable
exec_name = 'pygrb_page_tables'
# Initialize job node
grb_name = workflow.cp.get('workflow', 'trigger-name')
extra_tags = ['GRB'+grb_name]
# TODO: why is inj_set repeated twice in output files?
if inj_set is not None:
extra_tags.append(inj_set)
node = PlotExecutable(workflow.cp, exec_name,
ifos=workflow.ifos, out_dir=out_dir,
tags=tags+extra_tags).create_node()
# Pass the veto and segment files and options
if workflow.cp.has_option('workflow', 'veto-files'):
veto_files = build_veto_filelist(workflow)
node.add_input_list_opt('--veto-files', veto_files)
trig_time = workflow.cp.get('workflow', 'trigger-time')
node.add_opt('--trigger-time', trig_time)
# Other shared tuning values
for opt in ['chisq-index', 'chisq-nhigh', 'null-snr-threshold',
'veto-category', 'snr-threshold', 'newsnr-threshold',
'sngl-snr-threshold', 'null-grad-thresh', 'null-grad-val']:
if workflow.cp.has_option('workflow', opt):
node.add_opt('--'+opt, workflow.cp.get('workflow', opt))
# Handle input/output for injections
if inj_set:
# Found-missed injection file (passed as File instance)
fm_file = configparser_value_to_file(workflow.cp,
'injections-'+inj_set,
'found-missed-file')
node.add_input_opt('--found-missed-file', fm_file)
# Missed-found and quiet-found injections html output files
for mf_or_qf in ['missed-found', 'quiet-found']:
mf_or_qf_tags = [mf_or_qf.upper().replace('-', '_')]
node.new_output_file_opt(workflow.analysis_time, '.html',
'--'+mf_or_qf+'-injs-output-file',
tags=extra_tags+mf_or_qf_tags)
# Quiet-found injections h5 output file
node.new_output_file_opt(workflow.analysis_time, '.h5',
'--quiet-found-injs-h5-output-file',
tags=extra_tags+['QUIET_FOUND'])
# Handle input/output for onsource/offsource
else:
# Onsource input file (passed as File instance)
onsource_file = configparser_value_to_file(workflow.cp,
'workflow', 'onsource-file')
node.add_input_opt('--onsource-file', onsource_file)
# Loudest offsource triggers and onsource trigger html and h5 output
for src_type in ['onsource-trig', 'offsource-trigs']:
src_type_tags = [src_type.upper().replace('-', '_')]
node.new_output_file_opt(workflow.analysis_time, '.html',
'--loudest-'+src_type+'-output-file',
tags=extra_tags+src_type_tags)
node.new_output_file_opt(workflow.analysis_time, '.h5',
'--loudest-'+src_type+'-h5-output-file',
tags=extra_tags+src_type_tags)
# Add job node to the workflow
workflow += node
return node, node.output_files
# Based on setup_single_det_minifollowups
def setup_pygrb_minifollowups(workflow, followups_file,
dax_output, out_dir,
trig_file=None, tags=None):
"""Create plots that followup the the loudest PyGRB triggers or
missed injections from an HDF file.
Parameters
----------
workflow: pycbc.workflow.Workflow
The core workflow instance we are populating
followups_file: pycbc.workflow.File
The File class holding the triggers/injections.
dax_output: The directory that will contain the dax file.
out_dir: path
The directory to store minifollowups result plots and files
tags: {None, optional}
Tags to add to the minifollowups executables
"""
tags = [] if tags is None else tags
# _workflow.makedir(dax_output)
makedir(dax_output)
# Turn the config file into a File instance
# curr_ifo = single_trig_file.ifo
# config_path = os.path.abspath(dax_output + '/' + curr_ifo + \
config_path = os.path.abspath(dax_output + '/' + \
'_'.join(tags) + '_minifollowup.ini')
workflow.cp.write(open(config_path, 'w'))
config_file = resolve_url_to_file(config_path)
# wikifile = curr_ifo + '_'.join(tags) + 'loudest_table.txt'
wikifile = '_'.join(tags) + 'loudest_table.txt'
# Create the node
exe = Executable(workflow.cp, 'pygrb_minifollowups',
ifos=workflow.ifos, out_dir=dax_output,
tags=tags)
node = exe.create_node()
# Grab and pass all necessary files
if trig_file is not None:
node.add_input_opt('--trig-file', trig_file)
if workflow.cp.has_option('workflow', 'veto-files'):
veto_files = build_veto_filelist(workflow)
node.add_input_list_opt('--veto-files', veto_files)
trig_time = workflow.cp.get('workflow', 'trigger-time')
node.add_opt('--trigger-time', trig_time)
node.add_input_opt('--config-files', config_file)
node.add_input_opt('--followups-file', followups_file)
node.add_opt('--wiki-file', wikifile)
if tags:
node.add_list_opt('--tags', tags)
node.new_output_file_opt(workflow.analysis_time, '.dax', '--dax-file')
node.new_output_file_opt(workflow.analysis_time, '.dax.map',
'--output-map')
name = node.output_files[0].name
assert name.endswith('.dax')
map_file = node.output_files[1]
assert map_file.name.endswith('.map')
node.add_opt('--workflow-name', name)
node.add_opt('--output-dir', out_dir)
workflow += node
# Execute this in a sub-workflow
fil = node.output_files[0]
job = SubWorkflow(fil.name, is_planned=False)
job.set_subworkflow_properties(map_file,
staging_site=workflow.staging_site,
cache_file=workflow.cache_file)
job.add_into_workflow(workflow)
def setup_pygrb_results_workflow(workflow, res_dir, trig_file,
inj_files, tags=None,
explicit_dependencies=None):
"""Create subworkflow to produce plots, tables,
and results webpage for a PyGRB analysis.
Parameters
----------
workflow: pycbc.workflow.Workflow
The core workflow instance we are populating
res_dir: The post-processing directory where
results (plots, etc.) will be stored
trig_file: The triggers File object
inj_files: FileList of injection results
tags: {None, optional}
Tags to add to the executables
excplicit_dependencies: nodes that must precede this
"""
tags = [] if tags is None else tags
dax_output = res_dir+'/webpage_daxes'
# _workflow.makedir(dax_output)
makedir(dax_output)
# Turn the config file into a File instance
# config_path = os.path.abspath(dax_output + '/' + \
# '_'.join(tags) + 'webpage.ini')
# workflow.cp.write(open(config_path, 'w'))
# config_file = resolve_url_to_file(config_path)
# Create the node
exe = Executable(workflow.cp, 'pygrb_pp_workflow',
ifos=workflow.ifos, out_dir=dax_output,
tags=tags)
node = exe.create_node()
# Grab and pass all necessary files
node.add_input_opt('--trig-file', trig_file)
if workflow.cp.has_option('workflow', 'veto-files'):
veto_files = build_veto_filelist(workflow)
node.add_input_list_opt('--veto-files', veto_files)
# node.add_input_opt('--config-files', config_file)
node.add_input_list_opt('--inj-files', inj_files)
if tags:
node.add_list_opt('--tags', tags)
node.new_output_file_opt(workflow.analysis_time, '.dax',
'--dax-file', tags=tags)
node.new_output_file_opt(workflow.analysis_time, '.map',
'--output-map', tags=tags)
# + ['MAP'], use_tmp_subdirs=True)
name = node.output_files[0].name
assert name.endswith('.dax')
map_file = node.output_files[1]
assert map_file.name.endswith('.map')
node.add_opt('--workflow-name', name)
# This is the output dir for the products of this node, namely dax and map
node.add_opt('--output-dir', res_dir)
node.add_opt('--dax-file-directory', '.')
# Turn the config file into a File instance
config_path = os.path.abspath(dax_output + '/' + \
'_'.join(tags) + 'webpage.ini')
workflow.cp.write(open(config_path, 'w'))
config_file = resolve_url_to_file(config_path)
node.add_input_opt('--config-files', config_file)
# Track additional ini file produced by pycbc_pygrb_pp_workflow
out_file = File(workflow.ifos, 'pygrb_pp_workflow', workflow.analysis_time,
file_url=os.path.join(dax_output, name+'.ini'))
node.add_output(out_file)
# Add node to the workflow workflow
workflow += node
if explicit_dependencies is not None:
for dep in explicit_dependencies:
workflow.add_explicit_dependancy(dep, node)
# Execute this in a sub-workflow
job = SubWorkflow(name, is_planned=False) # , _id='results')
job.set_subworkflow_properties(map_file,
staging_site=workflow.staging_site,
cache_file=workflow.cache_file)
job.add_into_workflow(workflow)
return node.output_files