Raw File
#! /usr/bin/env python

# Copyright (C) 2017 Christopher M. Biwer, Alexander Harvey Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
""" Creates a DAX for a parameter estimation injection study.
"""

import argparse
import logging
import os
import shlex
import Pegasus.DAX3 as dax
import pycbc.version
import socket
import sys
from pycbc.results import layout
from pycbc.results import metadata
from pycbc.results import versioning
from pycbc.workflow import configuration
from pycbc.workflow import core
from pycbc.workflow import jobsetup
from pycbc.workflow import inference_followups
from pycbc.workflow import plotting

# set command line parser
parser = argparse.ArgumentParser(description=__doc__[1:])

# version option
parser.add_argument(
    "--version", action="version",
    version=pycbc.version.git_verbose_msg,
    help="Prints version information.")

# workflow options
parser.add_argument(
    "--workflow-name",
    default="inference_injection_run",
    help="Name of the workflow to append in various places.")
parser.add_argument(
    "--output-dir",
    default='run',
    help="Path that will contain output data files from the workflow.")
parser.add_argument(
    "--output-map",
    default="output.map",
    help="Path to output map file.")
parser.add_argument(
    "--output-file",
    required=True,
    help="Path to DAX file.")

# inference options
parser.add_argument(
    "--inference-config-file",
    type=str, required=True,
    help="WorkflowConfigParser parsable file with proir information.")

# input data for workflow options
parser.add_argument(
    "--data-type",
    choices=["analytical", "simulated_data", "detector_data"],
    default="detector_data",
    help="Set to 'analytical' to use test likelihood or set to "
         "'simulated_data' to use simulated detector data.")
parser.add_argument(
    "--gps-end-time",
    type=float, nargs="+", default=None,
    help="Trigger time to analyze. You only need to use this option "
         "with ``--data-type detector_data``.")

# add option groups
configuration.add_workflow_command_line_group(parser)

# parser command line
opts = parser.parse_args()

# make data output directory
core.makedir(opts.output_dir)

# the file we'll store all output files in
filedir = 'output'

# log to stdout until we know where the path to log output file
log_format = "%(asctime)s:%(levelname)s : %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO)

# create workflow and sub-workflows
container = core.Workflow(opts, opts.workflow_name)
workflow = core.Workflow(opts, opts.workflow_name + "-main")
finalize_workflow = core.Workflow(opts, opts.workflow_name + "-finalization")

# read inference configuration file
cp = configuration.WorkflowConfigParser([opts.inference_config_file])

# typecast str from command line to File instances
config_file = core.File.from_path(opts.inference_config_file)

# change working directory to the output
os.chdir(opts.output_dir)

# sections for output HTML pages
rdir = layout.SectionNumber("results",
                            ["posteriors", "samples", "workflow"])

# make results directories
core.makedir(rdir.base)
core.makedir(rdir["workflow"])

# create files to store workflow log
log_file_txt = core.File(workflow.ifos, "WORKFLOW-LOG",
                         workflow.analysis_time,
                         extension=".txt", directory=rdir["workflow"])
log_file_html = core.File(workflow.ifos, "WORKFLOW-LOG",
                          workflow.analysis_time,
                          extension=".html", directory=rdir["workflow"])

# switch saving log from stdout to file
logging.basicConfig(format=log_format, level=logging.INFO,
                    filename=log_file_txt.storage_path, filemode="w")
log_file = logging.FileHandler(filename=log_file_txt.storage_path, mode="w")
log_file.setLevel(logging.INFO)
formatter = logging.Formatter(log_format)
log_file.setFormatter(formatter)
logging.getLogger("").addHandler(log_file)
logging.info("Created log file {}".format(log_file_txt.storage_path))

# construct Executable for creating injections
create_injections_exe = jobsetup.PycbcCreateInjectionsExecutable(
                           workflow.cp, "create_injections",
                           ifo=workflow.ifos, out_dir=filedir)

# construct Executable for running sampler
inference_exe = jobsetup.PycbcInferenceExecutable(
                           workflow.cp, "inference",
                           ifo=workflow.ifos, out_dir=filedir)

# get channel names from workflow configuration file
channel_names = {}
for ifo in workflow.ifos:
    channel_names[ifo] = workflow.cp.get_opt_tags(
                               "workflow", "%s-channel-name" % ifo.lower(), "")

# get what parameters to plot in the PP and recovery plots
if 'pp-plot-parameters' in  workflow.cp.options("workflow-inference"):
    pp_plot_params = shlex.split(workflow.cp.get_opt_tag("workflow",
        'pp-plot-parameters', 'inference'))
else:
    # just use the variable params
    pp_plot_params = workflow.cp.options(cp.options("variable_params"))

# get groups of parameters to plot in posterior and samples plots
plot_groups = {}
for option in workflow.cp.options("workflow-inference"):
    if option.startswith("plot-group-"):
        group = option.replace("plot-group-", "").replace("-", "_")
        plot_groups[group] = shlex.split(workflow.cp.get_opt_tag(
                                "workflow", option, "inference"))
all_plot_parameters = sorted([param for group in plot_groups.values()
                              for param in group])
unique_plot_parameters = set(all_plot_parameters)

# formatted strings for output dirs
post_group_fmt = "posteriors/{}_posteriors"
sample_group_fmt = "samples/{}_samples"

# loop over number of injections
n_injections = int(workflow.cp.get_opt_tags(
                                "workflow-inference", "num-injections", ""))
inference_files = core.FileList([])
post_files = core.FileList([])
post_group_files = {g : core.FileList([]) for g in plot_groups.keys()}
post_table_files = core.FileList([])
accept_files = core.FileList([])
sample_group_files = {g : core.FileList([]) for g in plot_groups.keys()}
for i in range(n_injections):

    # make node for drawing injection parameter values
    if not opts.data_type == "analytical":
        node, injection_file = create_injections_exe.create_node(config_file,
                                                                 seed=i,
                                                                 tags=[str(i)])
        workflow += node
    else:
        injection_file = None

    # set fake strain seed
    if opts.data_type == "simulated_data":
        fake_strain_seed = {ifo : j + i * len(workflow.ifos)
                            for j, ifo in enumerate(workflow.ifos)}
    else:
        fake_strain_seed = None

    # make node for running sampler
    node, inference_file = inference_exe.create_node(
                                     channel_names,
                                     config_file,
                                     injection_file=injection_file,
                                     seed=i, fake_strain_seed=fake_strain_seed,
                                     tags=[str(i)])
    inference_files.append(inference_file)
    workflow += node

    # make node for writing HTML table of parameters
    post_table_files += inference_followups.make_inference_summary_table(
                          workflow, inference_file, rdir["posteriors"],
                          variable_params=unique_plot_parameters,
                          analysis_seg=workflow.analysis_time,
                          tags=[str(i)])

    # make node for plotting all parameters posteriors
    post_files += inference_followups.make_inference_posterior_plot(
                          workflow, inference_file, rdir["posteriors"],
                          parameters=unique_plot_parameters,
                          analysis_seg=workflow.analysis_time,
                          tags=[str(i)])

    if "inference_rate" in workflow.cp.options("executables"):
        # make node for plotting acceptance rate
        accept_files += inference_followups.make_inference_acceptance_rate_plot(
                              workflow, inference_file, rdir["samples"],
                              analysis_seg=workflow.analysis_time,
                              tags=[str(i)])

    # plot grouped parameters
    for group in plot_groups.keys():

        # make nodes for plotting grouped-parameters posteriors
        parameters = plot_groups[group]
        post_group_files[group] += \
                inference_followups.make_inference_posterior_plot(
                          workflow, inference_file,
                          rdir[post_group_fmt.format(group)],
                          parameters=parameters,
                          analysis_seg=workflow.analysis_time,
                          tags=[str(i), group])

        if "inference_samples" in workflow.cp.options("executables"):
            # make nodes for plotting sample as function of sampler iterations
            for j, parameter in enumerate(plot_groups[group]):
                sample_group_files[group] += \
                        inference_followups.make_inference_samples_plot(
                              workflow, inference_file,
                              rdir[sample_group_fmt.format(group)],
                              parameters=[parameter],
                              analysis_seg=workflow.analysis_time,
                              tags=[str(i), group, str(j)])

# add plots to HTML pages
layout.single_layout(rdir["posteriors"],
                     sum(zip(post_table_files, post_files), ()))
layout.single_layout(rdir["samples"], accept_files)
for group in sorted(plot_groups.keys()):
    layout.single_layout(rdir[post_group_fmt.format(group)],
                         post_group_files[group])
    layout.single_layout(rdir[sample_group_fmt.format(group)],
                         sample_group_files[group])

# add injection recovery plots
if not opts.data_type == "analytical":
    inj_int_files = inference_followups.make_inference_inj_plots(
                                         workflow,
                                         inference_files, rdir.base,
                                         pp_plot_params,
                                         name="inference_intervals")
    inj_rec_files = inference_followups.make_inference_inj_plots(
                                         workflow,
                                         inference_files, rdir.base,
                                         pp_plot_params,
                                         name="inference_recovery")
    layout.two_column_layout(rdir.base,
                             [(a, b)
                              for a, b in zip(inj_int_files, inj_rec_files)])

# create versioning HTML pages
versioning.create_versioning_page(rdir["workflow/version"], container.cp)

# create node for making HTML pages
plotting.make_results_web_page(finalize_workflow,
                               os.path.join(os.getcwd(), rdir.base))

# add sub-workflows to super-workflow
container += workflow
container += finalize_workflow

# make finalize sub-workflow depend on main sub-workflow
dep = dax.Dependency(parent=workflow.as_job, child=finalize_workflow.as_job)
container._adag.addDependency(dep)

# write dax
container.save(filename=opts.output_file, output_map_path=opts.output_map)

# save workflow configuration file
base = rdir["workflow/workflow_configuration"]
core.makedir(base)
workflow_ini = workflow.save_config("workflow.ini", base, container.cp)
layout.single_layout(base, workflow_ini)

# save inference configuration file
base = rdir["workflow/inference_configuration"]
core.makedir(base)
prior_ini = workflow.save_config("inference.ini", base, cp)
layout.single_layout(base, prior_ini)

# close the log and flush to the html file
logging.shutdown()
with open (log_file_txt.storage_path, "r") as log_file:
    log_data = log_file.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), opts.workflow_name, socket.gethostname(), log_data)
kwds = {"title" : "Workflow Generation Log",
        "caption" : "Log of the workflow script %s" % sys.argv[0],
        "cmd" : " ".join(sys.argv)}
metadata.save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)
layout.single_layout(rdir["workflow"], ([log_file_html]))
back to top