https://github.com/gwastro/pycbc
Raw File
Tip revision: 093976241b682ffad07cc0e72be6ac799b7374d8 authored by Andrew Williamson on 26 May 2016, 12:29:19 UTC
v1.4.1
Tip revision: 0939762
pycbc_make_offline_grb_workflow
#!/usr/bin/env python

# Copyright (C) 2015 Andrew R. Williamson
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
Make workflow for the archival, targeted, coherent inspiral pipeline.
"""

import pycbc.version

__author__ = "Andrew Williamson <andrew.williamson@ligo.org>"
__version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_make_offline_grb_workflow"

import shutil
import sys
import os
import argparse
import logging
import pycbc.workflow as _workflow
from glue.segments import segment, segmentlist, segmentlistdict
from pycbc.results.legacy_grb import make_grb_segments_plot

workflow_name = "pygrb_offline"
logging.basicConfig(format="%(asctime)s:%(levelname)s : %(message)s",
                    level=logging.INFO)

# Parse command line options and instantiate pycbc workflow object
parser = argparse.ArgumentParser()
parser.add_argument("--version", action="version", version=__version__)
_workflow.add_workflow_command_line_group(parser)
args = parser.parse_args()
wflow = _workflow.Workflow(args, workflow_name)
all_files = _workflow.FileList([])
tags = []
initDir = os.getcwd()

logging.info("Generating %s workflow" % workflow_name)

# Setup run directory
if wflow.cp.has_option("workflow", "output-directory"):
    baseDir = wflow.cp.get("workflow", "output-directory")
else:
    baseDir = os.getcwd()
triggername = str(wflow.cp.get("workflow", "trigger-name"))
runDir = os.path.join(baseDir, "GRB%s" % triggername)
logging.info("Workflow will be generated in %s" % runDir)
if not os.path.exists(runDir):
    os.makedirs(runDir)
os.chdir(runDir)

# SEGMENTS
triggertime = int(wflow.cp.get("workflow", "trigger-time"))
start = triggertime - int(wflow.cp.get("workflow-exttrig_segments",
                                       "max-duration"))
end = triggertime + int(wflow.cp.get("workflow-exttrig_segments",
                                     "max-duration"))
wflow.cp = _workflow.set_grb_start_end(wflow.cp, start, end)

# Retrieve segments ahope-style
currDir = os.getcwd()
segDir = os.path.join(currDir, "segments")
sciSegs, segsFileList = _workflow.setup_segment_generation(wflow, segDir)

# Make coherent network segments
if wflow.cp.has_option("inspiral", "analyse-segment-end"):
    safety = 1
    deadtime = int(wflow.cp.get("inspiral", "segment-duration")) / 2
    spec_len = int(wflow.cp.get("inspiral", "inverse-spec-length")) / 2
    wflow.cp.set("workflow-exttrig_segments", "min-before",
                 str(deadtime - spec_len - safety))
    wflow.cp.set("workflow-exttrig_segments", "min-after",
                 str(spec_len + safety))
else:
    deadtime = int(wflow.cp.get("inspiral", "segment-duration")) / 4
    wflow.cp.set("workflow-exttrig_segments", "min-before", str(deadtime))
    wflow.cp.set("workflow-exttrig_segments", "min-after", str(deadtime))

single_ifo = wflow.cp.has_option("workflow", "allow-single-ifo-search")
if len(sciSegs.keys()) == 0:
    plot_met = make_grb_segments_plot(wflow, segmentlistdict(), triggertime,
            triggername, segDir)
    logging.error("No science segments available.")
    sys.exit()
elif len(sciSegs.keys()) < 2 and not single_ifo:
    plot_met = make_grb_segments_plot(wflow, segmentlistdict(sciSegs),
            triggertime, triggername, segDir)
    msg = "Science segments exist only for %s. " % sciSegs.keys()[0]
    msg += "If you wish to enable single IFO running add the option "
    msg += "'allow-single-ifo-search' to the [workflow] section of your "
    msg += "configuration file." 
    logging.error(msg)
    sys.exit()
elif len(sciSegs.keys()) < 2:
    logging.info("Generating a single IFO search.")
    onSrc, offSrc = _workflow.get_triggered_single_ifo_segment(wflow, segDir,
                                                               sciSegs)
else:
    onSrc, offSrc = _workflow.get_triggered_coherent_segment(wflow, segDir,
            sciSegs, single_ifo)

sciSegs = segmentlistdict(sciSegs)
if onSrc is None:
    plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
            segDir, fail_criterion=offSrc)
    sys.exit()
else:
    plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
            segDir, coherent_seg=offSrc[offSrc.keys()[0]][0])
    segs_plot = _workflow.File(plot_met[0], plot_met[1], plot_met[2],
                               file_url=plot_met[3])
    segs_plot.PFN(segs_plot.cache_entry.path, site="local")
    sciSegs = offSrc
    all_files.extend(_workflow.FileList([segs_plot]))

# How many IFOs do we have in the search?
if len(sciSegs.keys()) == 1:
    mf_tag = "sngl"
else:
    mf_tag = "coherent"

# Update analysis time after coherent segment calculation
ifo = sciSegs.keys()[0]
padding = int(wflow.cp.get("inspiral", "pad-data"))
if wflow.cp.has_option("inspiral", "analyse-segment-end"):
    wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime - \
                                      spec_len + padding - safety,
                                  int(sciSegs[ifo][0][1]) - spec_len - \
                                      padding - safety)
else:
    wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime + padding,
                                  int(sciSegs[ifo][0][1]) - deadtime - padding)

wflow.cp = _workflow.set_grb_start_end(wflow.cp, int(sciSegs[ifo][0][0]),
                                       int(sciSegs[ifo][0][1]))
ext_file = None

# DATAFIND
dfDir = os.path.join(currDir, "datafind")
datafind_files, _, sciSegs, _ = _workflow.setup_datafind_workflow(wflow, sciSegs,
        dfDir, segsFileList)
ifo = sciSegs.keys()[0]
ifos = ''.join(sciSegs.keys())
wflow.ifos = ifos
datafind_veto_files = _workflow.FileList([])
datafind_veto_files.extend(datafind_files)

# Is this an IPN GRB?
if wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
        and wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
    search_pts_file, sim_pts_file = _workflow.get_ipn_sky_files(wflow)
    wflow.cp.set("injections", "ipn-gps-time",
            wflow.cp.get("workflow", "trigger-time"))
    IPN = True
    all_files.extend(_workflow.FileList([search_pts_file, sim_pts_file]))
elif wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
        or wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
    msg = "You have provided only one of 'ipn-search-points' under "
    msg += "[workflow-inspiral] and 'ipn-sim-points' under "
    msg += "[workflow-injections] in your configuration files. If this is an "
    msg += "IPN GRB please provide both, otherwise provide neither."
    logging.error(msg)
    sys.exit()
else:
    IPN = False

# If using coh_PTF_inspiral we need bank_veto_bank.xml
if os.path.basename(wflow.cp.get("executables", "inspiral")) \
                    == "lalapps_coh_PTF_inspiral":
    bank_veto_file = _workflow.get_coh_PTF_files(wflow.cp, ifos, runDir,
                                                 bank_veto=True)
    datafind_veto_files.extend(bank_veto_file)
    
    if IPN:
        datafind_veto_files.extend(_workflow.FileList([search_pts_file]))

    # Make ExtTrig xml file (needed for lalapps_inspinj and summary pages)
    ext_file = _workflow.make_exttrig_file(wflow.cp, ifos, sciSegs[ifo][0],
                                           baseDir)
    all_files.extend(_workflow.FileList([ext_file]))

all_files.extend(datafind_veto_files)

# TEMPLATE BANK AND SPLIT BANK
bank_files = _workflow.setup_tmpltbank_workflow(wflow, sciSegs,
                                                datafind_files, dfDir)
splitbank_files = _workflow.setup_splittable_workflow(wflow, bank_files, dfDir,
                                                      tags=["inspiral"])
all_files.extend(bank_files)
all_files.extend(splitbank_files)

# INJECTIONS
injs = None
inj_tags = []
inj_files = None
inj_caches = None
inj_insp_files = None
inj_insp_caches = None
if wflow.cp.has_section("workflow-injections"):
    injDir = os.path.join(currDir, "injections")
    inj_caches = _workflow.FileList([])
    inj_insp_caches = _workflow.FileList([])

    # Generate injection files
    if IPN:
        inj_files, inj_tags = _workflow.setup_injection_workflow(wflow, injDir,
                exttrig_file=sim_pts_file)
    else:
        inj_files, inj_tags = _workflow.setup_injection_workflow(wflow, injDir,
                exttrig_file=ext_file)
    all_files.extend(inj_files)
    injs = inj_files

    # Either split template bank for injections jobs or use same split banks
    # as for standard matched filter jobs
    if wflow.cp.has_section("workflow-splittable-injections"):
        inj_splitbank_files = _workflow.setup_splittable_workflow(wflow,
                bank_files, injDir, tags=["injections"])
        for inj_split in inj_splitbank_files:
            split_str = [s for s in inj_split.tagged_description.split("_") \
                         if ("BANK" in s and s[-1].isdigit())]
            if len(split_str) != 0:
                inj_split.tagged_description += "%s_%d" % (inj_split.tag_str,
                       int(split_str[0].replace("BANK", "")))
        all_files.extend(inj_splitbank_files)
    else:
        inj_splitbank_files = _workflow.FileList([])
        inj_splitbank_files.extend(splitbank_files)

    # Split the injection files
    if wflow.cp.has_section("workflow-splittable-split_inspinj"):
        inj_split_files = _workflow.FileList([])
        for inj_file, inj_tag in zip(inj_files, inj_tags):
            file = _workflow.FileList([inj_file])
            inj_splits = _workflow.setup_splittable_workflow(wflow, file,
                    injDir, tags=["split_inspinj", inj_tag])
            for inj_split in inj_splits:
                split_str = [s for s in \
                             inj_split.tagged_description.split("_") \
                             if ("SPLIT" in s and s[-1].isdigit())]
                if len(split_str) != 0:
                    new = inj_split.tagged_description.replace(split_str[0],
                            "SPLIT_%s" % split_str[0].replace("SPLIT", ""))
                    inj_split.tagged_description = new
            inj_split_files.extend(inj_splits)
        all_files.extend(inj_split_files)
        injs = inj_split_files

    # Generate injection matched filter workflow
    inj_insp_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
            datafind_veto_files, inj_splitbank_files, injDir, injs,
            tags=[mf_tag + "_injections"])
    for inj_insp_file in inj_insp_files:
        split_str = [s for s in inj_insp_file.name.split("_") \
                     if ("SPLIT" in s and s[-1].isdigit())]
        if len(split_str) != 0:
            num = split_str[0].replace("SPLIT", "_")
            inj_insp_file.tagged_description += num

    # Make cache files (needed for post-processing)
    for inj_tag in inj_tags:
        files = _workflow.FileList([file for file in injs \
                                    if inj_tag in file.tag_str])
        inj_cache = _workflow.File(ifos, "injections", sciSegs[ifo][0],
                                   extension="lcf", directory=injDir,
                                   tags=[inj_tag])
        inj_cache.PFN(inj_cache.cache_entry.path, site="local")
        inj_caches.extend(_workflow.FileList([inj_cache]))
        inj_cache_entries = files.convert_to_lal_cache()
        inj_cache_entries.tofile(open(inj_cache.storage_path, "w"))

        files = _workflow.FileList([file for file in inj_insp_files \
                                    if inj_tag in file.tag_str])
        inj_insp_cache = _workflow.File(ifos, "inspiral_injections",
                                        sciSegs[ifo][0], extension="lcf",
                                        directory=injDir, tags=[inj_tag])
        inj_insp_cache.PFN(inj_insp_cache.cache_entry.path, site="local")
        inj_insp_caches.extend(_workflow.FileList([inj_insp_cache]))
        inj_insp_cache_entries = files.convert_to_lal_cache()
        inj_insp_cache_entries.tofile(open(inj_insp_cache.storage_path, "w"))

    all_files.extend(inj_caches)
    all_files.extend(inj_insp_files)
    all_files.extend(inj_insp_caches)

# MAIN MATCHED FILTERING
inspDir = os.path.join(currDir, "inspiral")
inspiral_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
        datafind_veto_files, splitbank_files, inspDir,
        tags=[mf_tag + "_no_injections"])
all_files.extend(inspiral_files)
inspiral_cache = _workflow.File(ifos, "inspiral", sciSegs[ifo][0],
                                extension="lcf", directory=inspDir)
inspiral_cache.PFN(inspiral_cache.cache_entry.path, site="local")
all_files.extend(_workflow.FileList([inspiral_cache]))
inspiral_cache_entries = inspiral_files.convert_to_lal_cache()
inspiral_cache_entries.tofile(open(inspiral_cache.storage_path, "w"))

# POST-PROCESSING
ppDir = os.path.join(currDir, "post_processing")
post_proc_method = wflow.cp.get_opt_tags("workflow-postproc",
                                         "postproc-method", tags)

if post_proc_method == "COH_PTF_WORKFLOW":
    # Add parsed config file so it can be linked from summary page
    cp_file_name = "%s_parsed.ini" % workflow_name
    cp_file_url = "file://localhost%s/%s" % (baseDir, cp_file_name)
    cp_file = _workflow.File(ifos, cp_file_name, sciSegs[ifo][0],
                             file_url=cp_file_url)
    cp_file.PFN(cp_file.cache_entry.path, site="local")
    if not os.path.exists(cp_file.storage_path):
        shutil.copy2("%s/%s" % (initDir, cp_file_name), baseDir)

    # Generate post-processing workflow
    html_dir = wflow.cp.get("workflow", "html-dir")
    pp_files = _workflow.setup_coh_PTF_post_processing(wflow, inspiral_files,
            inspiral_cache, ppDir, segDir,
            injection_trigger_files=inj_insp_files, injection_files=injs,
            injection_trigger_caches=inj_insp_caches,
            injection_caches=inj_caches, config_file=cp_file, web_dir=html_dir,
            segments_plot=segs_plot, ifos=ifos, inj_tags=inj_tags)

    # Retrieve style files for webpage
    summary_files = _workflow.get_coh_PTF_files(wflow.cp, ifos, ppDir,
                                                summary_files=True)

    pp_files.extend(_workflow.FileList([cp_file]))
    pp_files.extend(summary_files)

all_files.extend(pp_files)

# COMPILE WORKFLOW AND WRITE DAX
wflow.save()
logging.info("Written dax.")

back to top