Raw File
pycbc_make_inference_workflow
#!/bin/env python

# Copyright (C) 2016 Christopher M. Biwer, Alexander Harvey Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
""" Creates a DAX for a parameter estimation workflow.
"""

import os, argparse, logging, h5py
import pycbc.workflow as wf
import pycbc.workflow.inference_followups as inffu
import pycbc.workflow.minifollowups as mini
import pycbc.workflow.pegasus_workflow as wdax
import pycbc.version
from glue import segments
from pycbc.results import layout
from pycbc.types import MultiDetOptionAction
from pycbc.workflow import WorkflowConfigParser

def to_file(path, ifo=None):
    """ Takes a str and returns a pycbc.workflow.pegasus_workflow.File
    instance.
    """
    fil = wdax.File(os.path.basename(path))
    fil.ifo = ifo
    path = os.path.abspath(path)
    fil.PFN(path, "local")
    return fil

# command line parser
parser = argparse.ArgumentParser(description=__doc__[1:])

# version option
parser.add_argument("--version", action="version",
    version=pycbc.version.git_verbose_msg, 
    help="Prints version information.")

# workflow options
parser.add_argument("--workflow-name", default="my_unamed_inference_run",
    help="Name of the workflow to append in various places.")
parser.add_argument("--tags", nargs="+", default=[],
    help="Tags to apend in various places.")
parser.add_argument("--output-dir", default=None,
    help="Path to output directory.")
parser.add_argument("--output-map", required=True,
    help="Path to output map file.")
parser.add_argument("--output-file", required=True,
    help="Path to DAX file.")

# input workflow file options
parser.add_argument("--bank-file", default=None,
    help="HDF format template bank file.")
parser.add_argument("--statmap-file", default=None,
    help="HDF format clustered coincident trigger result file.")
parser.add_argument("--single-detector-triggers", nargs="+", default=None,
    action=MultiDetOptionAction,
    help="HDF format merged single detector trigger files.")

# analysis time option
# only required if not using input workflow file options
parser.add_argument("--gps-end-time", type=float, nargs="+", default=None,
    help="Times to analyze. If given workflow files then this option is ignored.")

# input configuration file options
parser.add_argument("--inference-config-file", type=str, required=True,
    help="workflow.WorkflowConfigParser parsable file with proir information.")
parser.add_argument("--prior-section", type=str, default="prior",
    help="Name of the section in inference configuration file that contains priors.")

# add option groups
wf.add_workflow_command_line_group(parser)

# parser command line
opts = parser.parse_args()

# setup log
logging.basicConfig(format="%(asctime)s:%(levelname)s : %(message)s", 
                    level=logging.INFO)

# sanity check that user is either using workflow or command line
workflow_options = opts.bank_file != None \
                   and opts.statmap_file != None \
                   and opts.single_detector_triggers != None
if not workflow_options and not (opts.gps_end_time != None):
    raise ValueError("Must use either workflow options or --gps-end-time")

# create workflow
workflow = wf.Workflow(opts, opts.workflow_name)

# create output directory
wf.makedir(opts.output_dir)

# typecast str from command line to File instances
config_file = to_file(opts.inference_config_file)

# if using workflow files to find analysis times
if workflow_options:

    # typecast str from command line to File instances
    tmpltbank_file = to_file(opts.bank_file)
    coinc_file = to_file(opts.statmap_file)
    single_triggers = []
    for ifo in opts.single_detector_triggers:
        fname = opts.single_detector_triggers[ifo]
        single_triggers.append( to_file(fname, ifo=ifo) )

    # get number of events to analyze
    num_events = int(workflow.cp.get_opt_tags("workflow-inference", "num-events", ""))

    # get detection statistic from statmap file
    # if less events that requested to analyze in config file then analyze all events
    f = h5py.File(opts.statmap_file, "r")
    stat = f["foreground/stat"][:]
    if len(stat) < num_events:
        num_events = len(stat)

    # get index for sorted detection statistic
    sorting = stat.argsort()[::-1]

    # get times for loudest events
    times = {
        f.attrs["detector_1"]: f["foreground/time1"][:][sorting][0:num_events],
        f.attrs["detector_2"]: f["foreground/time2"][:][sorting][0:num_events],
    }

# else get analysis times from command line
else:
    times = dict([(ifo,opts.gps_end_time) for ifo in workflow.ifos])
    num_events = len(opts.gps_end_time)

# get frame type and channel name as a dict with IFO as keys
frame_types = {}
channel_names = {}
for ifo in workflow.ifos:
    frame_types[ifo] = workflow.cp.get_opt_tags("workflow-datafind",
                                      "datafind-%s-frame-type"%ifo.lower(), "")
    channel_names[ifo] = workflow.cp.get_opt_tags("workflow",
                                      "%s-channel-name"%ifo.lower(), "")
frame_types_str = " ".join([key+":"+val for key,val in frame_types.iteritems()])
channel_names_str = " ".join([key+":"+val for key,val in channel_names.iteritems()])

# construct Executable for running sampler
inference_exe = wf.Executable(workflow.cp, "inference", ifos=workflow.ifos,
                         out_dir=opts.output_dir)

# get plotting parameters
cp = WorkflowConfigParser([opts.inference_config_file])
variable_args = cp.options("variable_args")

# add derived mass parameters if mass1 and mass2 in [variable_args]
if "mass1" in variable_args and "mass2" in variable_args:
    variable_args += ["mchirp", "eta"]
variable_args.sort()

# create a list that will contain all output files
layouts = []

# loop over number of loudest events to be analyzed
for num_event in range(num_events):

    # set GPS times for reading in data around the event
    avg_end_time = sum([end_times[num_event] \
                          for end_times in times.values()]) / len(times.keys())
    seconds_before_time = int(workflow.cp.get_opt_tags("workflow-inference",
                                            "data-seconds-before-trigger", ""))
    seconds_after_time = int(workflow.cp.get_opt_tags("workflow-inference",
                                            "data-seconds-after-trigger", ""))
    gps_start_time = int(avg_end_time) - seconds_before_time
    gps_end_time = int(avg_end_time) + seconds_after_time

    # make node for running sampler
    node = inference_exe.create_node()
    node.add_opt("--instruments", " ".join(workflow.ifos))
    node.add_opt("--gps-start-time", gps_start_time)
    node.add_opt("--gps-end-time", gps_end_time)
    node.add_opt("--frame-type", frame_types_str)
    node.add_opt("--channel-name", channel_names_str)
    node.add_input_opt("--config-file", config_file)
    analysis_time = segments.segment(gps_start_time, gps_end_time)
    inference_file = node.new_output_file_opt(analysis_time, ".hdf",
                                 "--output-file", tags=[str(num_event)])

    # add node to workflow
    workflow += node

    # add file that prints information about the search event
    if workflow_options:
        layouts += [mini.make_coinc_info(workflow, single_triggers, tmpltbank_file,
                              coinc_file, num_event, 
                              opts.output_dir, tags=opts.tags + [str(num_event)])]

    # add files from posterior summary table for this event
    layouts += [inffu.make_inference_summary_table(workflow, inference_file,
                          opts.output_dir, variable_args=variable_args,
                          analysis_seg=analysis_time,
                          tags=opts.tags + [str(num_event)])]

    # add files from posterior corner plot for this event
    layouts += [inffu.make_inference_posterior_plot(workflow, inference_file,
                          opts.output_dir, variable_args=variable_args,
                          analysis_seg=analysis_time,
                          tags=opts.tags + [str(num_event)])]

    # get files from plotting acceptance rate plot for this event
    rate_files = inffu.make_inference_acceptance_rate_plot(workflow,
                          inference_file,  opts.output_dir,
                          analysis_seg=analysis_time,
                          tags=opts.tags + [str(num_event)])

    # get files from plotting PSD for this event
    psd_file = wf.make_spectrum_plot(workflow, [inference_file], opts.output_dir,
                          tags=opts.tags + [str(num_event)])

    # add a row to the HTML page with accpetance rate on the left
    # and the PSD plot on the right
    layouts += list(layout.grouper(rate_files+[psd_file], 2))

    # add files from plotting parameter samples and autocorrelation
    # function for this event
    files = inffu.make_inference_single_parameter_plots(workflow, inference_file,
                          opts.output_dir, variable_args=variable_args,
                          analysis_seg=analysis_time,
                          tags=opts.tags + [str(num_event)]) 
    layouts += list(layout.grouper(files, 2))

# add files from prior plots for this event
# do not add them to layouts so that they appear in the
# additional files section
for subsection in cp.get_subsections(opts.prior_section):
    inffu.make_inference_prior_plot(workflow, config_file, opts.output_dir,
                          analysis_seg=workflow.analysis_time,
                          sections=[opts.prior_section + "-" + subsection],
                          tags=opts.tags + [subsection])

# write dax
workflow.save(filename=opts.output_file, output_map=opts.output_map)

# write html well
layout.two_column_layout(opts.output_dir, layouts)
back to top