Revision 953496d3048fba346f13a2495238ee0901790484 authored by Duncan Brown on 10 November 2015, 16:20:04 UTC, committed by Duncan Brown on 10 November 2015, 16:20:04 UTC
1 parent 6bde8bd
Raw File
pycbc_make_coinc_pipedown_workflow
#!/usr/bin/env python

# Copyright (C) 2013 Ian W. Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
Program for running multi-detector pycbc analysis through coincidence and then
generate post-processing and plots.
"""
import pycbc
import pycbc.version
__author__  = "Ian Harry <ian.harry@astro.cf.ac.uk>"
__version__ = pycbc.version.git_verbose_msg
__date__    = pycbc.version.date
__program__ = "weekly_ahope"

import os, copy, shutil, argparse, ConfigParser, logging
from glue import segments
import pycbc.workflow as _workflow
from pycbc.workflow import pegasus_workflow
import Pegasus.DAX3 as dax

logging.basicConfig(format='%(asctime)s:%(levelname)s : %(message)s',
                    level=logging.INFO)

_desc = __doc__[1:]
parser = argparse.ArgumentParser(description=_desc)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument("-d", "--output-dir", default=None,
                    help="Path to output directory.")
parser.add_argument("--enable-hdf-post-processing", action='store_true')
_workflow.add_workflow_command_line_group(parser)
args = parser.parse_args()

container = _workflow.Workflow(args, 'weekly_ahope')
workflow = _workflow.Workflow(args, 'main')
post_workflow = _workflow.Workflow(args, 'post_processing')

# Needed later for WIP
if args.output_dir:
    baseDir = args.output_dir
else:
    baseDir = os.getcwd()
runDir = os.path.join(baseDir, '%d-%d' %tuple(workflow.analysis_time))
if not os.path.exists(runDir):
    os.makedirs(runDir)
os.chdir(runDir)

currDir = os.getcwd()

segDir   = os.path.join(currDir, "segments")
dfDir    = os.path.join(currDir, "datafind")
fdDir    = os.path.join(currDir, "full_data")
tsDir    = os.path.join(currDir, "time_slide_files")
injDir   = os.path.join(currDir, "inj_files")
ppDir    = os.path.join(currDir, "workflow_postproc")
playDir  = os.path.join(currDir, "playground")
hwinjDir = os.path.join(currDir, "hardware_injection_summary")
summ_dir = os.path.join(currDir, "summary")

# Get segments and find where the data is
# NOTE: not all files are returned to top level, so all_files has some gaps
all_files = _workflow.FileList([])
scienceSegs, segsFileList = _workflow.setup_segment_generation(workflow, segDir)
datafind_files, scienceSegs = _workflow.setup_datafind_workflow(workflow,
                                            scienceSegs, dfDir, segsFileList)
all_files.extend(datafind_files)

veto_cats_string = workflow.cp.get('workflow-segments',
            'segments-veto-categories')
veto_cats = [int(c) for c in veto_cats_string.split(',')]

# Template bank stuff
bank_files = _workflow.setup_tmpltbank_workflow(workflow, scienceSegs,
                                                datafind_files, dfDir)

if args.enable_hdf_post_processing:
    # Convert the SINGLE FIXED template bank file to an hdf format
    hdfbank = _workflow.convert_bank_to_hdf(workflow, bank_files, dfDir)

splitbank_files = _workflow.setup_splittable_workflow(workflow,
                                                      bank_files, dfDir)

all_files.extend(bank_files)
all_files.extend(splitbank_files)

# setup the injection files
# FIXME: Pipedown expects the injections to have the random seed as a tag,
# here we just add that tag.
inj_files, inj_tags = _workflow.setup_injection_workflow(workflow,
                                           output_dir=injDir, tags=['2134'])
timeSlideFiles = _workflow.setup_timeslides_workflow(workflow,
                                           output_dir=tsDir,
                                           timeSlideSectionName='tisi')

all_files.extend(inj_files)
tags = ["full_data"] + inj_tags
output_dirs = [fdDir]
output_dirs.extend([os.path.join(currDir, tag) for tag in inj_tags])
all_coincs = _workflow.FileList([])
fd_insps = _workflow.FileList([])
for inj_file, tag, output_dir in zip([None]+inj_files, tags, output_dirs):
    if not tag == 'full_data':
        timeSlideTags = ['zerolag']
    else:
        timeSlideTags = ['zerolag','slides']
    insps = _workflow.setup_matchedfltr_workflow(workflow, scienceSegs,
                                           datafind_files, splitbank_files,
                                           output_dir, injection_file=inj_file,
                                           tags = [tag])
    all_files.extend(insps)

    # Setup hdf coincidence and post-processing/plotting
    if args.enable_hdf_post_processing:
        # Workaround for the current segment module HARDCODED!!!
        final_veto_file = segsFileList.find_output_with_tag('CUMULATIVE_CAT_3')
        final_veto_name = ['VETO_CAT3_CUMULATIVE']
        cum_veto_files = _workflow.FileList()

        insps_hdf = _workflow.convert_trig_to_hdf(workflow, hdfbank, insps, output_dir, tags=[tag])
        insps_hdf = _workflow.merge_single_detector_hdf_files(workflow, hdfbank[0], insps_hdf, output_dir, tags=[tag])

        if tag == 'full_data':
            ctags = [tag, 'full']
            full_insps = insps_hdf
            final_bg_file =  _workflow.setup_interval_coinc(workflow, hdfbank,
                            insps_hdf, final_veto_file, final_veto_name,
                            output_dir, tags=ctags)
            _workflow.make_foreground_table(workflow, final_bg_file[0], hdfbank[0], final_veto_name[0], 'plots/foreground', tags=[tag])

            _workflow.make_snrchi_plot(workflow, insps_hdf, final_veto_file[0], final_veto_name[0],
                                      'plots/background', tags=[tag])

        else:
            ctags = [tag, 'inj']

            inj_coinc = _workflow.setup_interval_coinc_inj(workflow, hdfbank, 
                            full_insps, insps_hdf, final_bg_file,
                            final_veto_file[0], final_veto_name[0],
                            output_dir, tags = ctags)
            found_inj = _workflow.find_injections_in_hdf_coinc(workflow, _workflow.FileList([inj_coinc]),
                            _workflow.FileList([inj_file]), 
                            final_veto_file[0], final_veto_name[0], 
                            output_dir, tags=ctags)

            _workflow.make_sensitivity_plot(workflow, found_inj, 'plots/sensitivity',
                                     tags=ctags)
            _workflow.make_foundmissed_plot(workflow, found_inj, 'plots/foundmissed',
                                     tags=[tag])
            for inj_insp, trig_insp in zip(insps_hdf, full_insps):
                _workflow.make_coinc_snrchi_plot(workflow, found_inj, inj_insp, 
                                          final_bg_file[0], trig_insp,
                                          'plots/background', tags=[tag])

    coincs, others = _workflow.setup_coincidence_workflow(workflow, segsFileList,
                                        timeSlideFiles, insps, output_dir,
                                        tags=[tag], veto_cats=veto_cats,
                                        timeSlideTags=timeSlideTags)

    all_files.extend(coincs)
    all_coincs.extend(coincs)

    # Write the summary file if this is full_data
    if tag == 'full_data':
        anal_log_files = _workflow.setup_analysislogging(workflow, segsFileList,
                               insps, args, summ_dir, program_name=__program__)
        fd_insps = insps

# Set up playground trigger jobs
play_exe = _workflow.Executable(workflow.cp, 'pycbc_ligolw_find_playground')
play_segs = _workflow.segment.find_playground_segments([workflow.analysis_time])
for tag in ['playground']:
    if not os.path.exists(playDir):
        os.makedirs(playDir)

    for insp in fd_insps:
        if play_segs.intersects_segment(insp.segment):
            play_file = _workflow.File(insp.ifo, 'inspiral',
                             insp.segment, directory=playDir,
                             extension="xml.gz", tags=['playground']+insp.tags)

            play_node = _workflow.Node(play_exe)
            play_node.add_input_opt('--trigger-file', insp)
            play_node.add_output_opt('--output-file', play_file)

            workflow.add_node(play_node)

            all_files.append(play_file)

# Set up workflow's post-processing, this is still incomplete

postProcPrepFiles = _workflow.setup_postprocessing_preparation(workflow,
                      all_coincs, ppDir, injectionFiles=inj_files,
                      injectionTags=inj_tags, injLessTag='full_data',
                      vetoFiles=segsFileList, veto_cats=veto_cats)

postProcFiles = _workflow.setup_postprocessing(workflow, postProcPrepFiles,
                                           anal_log_files, ppDir, tags=[],
                                           veto_cats=veto_cats)

# Dump out the formatted, combined ini file
workflow.cp.write(file("workflow_configuration.ini", "w"))

# Get start and end times for summary/plotting jobs
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]

# Legacy plotting codes use a glue.lal cache file so create this
cacheFileList = all_files.convert_to_lal_cache()
cacheFileName = os.path.join(currDir, 'ihope_full_cache.cache')
cacheFileFP = open(cacheFileName, 'w')
cacheFileList.tofile(cacheFileFP)
cacheFileFP.close()

########## Plotting follows ########

# Set up summary plots
for tag in ['full_data']:
    cacheFileName = os.path.join(currDir, 'ihope_full_cache.cache')
    plotDir = os.path.join(currDir, '%s_summary_plots'%tag)
    plotFiles = _workflow.setup_summary_plots(post_workflow, all_coincs,
                      cacheFileName,
                      tmpltbank_cachepattern='TMPLTBANK',
                      inspiral_cachepattern='INSPIRAL_FULL_DATA',
                      output_dir=plotDir, tags=[tag])
for tag in ['full_data_slide']:
    cacheFileName = os.path.join(currDir, 'ihope_full_cache.cache')
    plotDir = os.path.join(currDir, '%s_summary_plots'%tag)
    plotFiles = _workflow.setup_summary_plots(post_workflow, all_coincs,
                      cacheFileName,
                      tmpltbank_cachepattern='TMPLTBANK',
                      inspiral_cachepattern='INSPIRAL_PLAYGROUND',
                      output_dir=plotDir, tags=[tag])

# Set up hardware injection summary page
hwinjFiles = _workflow.setup_hardware_injection_page(post_workflow, all_coincs,
                   cacheFileName, output_dir=hwinjDir, tags=['full_data'],
                   inspiral_cachepattern='THINCA_ZEROLAG_FULL_DATA_CAT_1_VETO')

# Set up for running the pipedown plotting codes

# Make directory
pipedownPlotDir = os.path.join(currDir, 'coinc_result_plots')
if not os.path.exists(pipedownPlotDir+'/logs'):
    os.makedirs(pipedownPlotDir+'/logs')
os.chdir(pipedownPlotDir)

# Create the necessary ini file
# This is sufficiently different from the original ini file that some editing is
# required.
pipeCp = copy.deepcopy(workflow.cp)
# Create the condor sections
pipeCp.add_section("condor")
pipeCp.set("condor","universe","vanilla")
for item,value in pipeCp.items('executables'):
    pipeCp.set("condor", item, value)

# Write ini file to folder

iniFile = os.path.join(pipedownPlotDir, 'pipedown.ini')
pipeCp.write(file(iniFile,"w"))

# Set up command to run pipedown_plots

pipedown_log_dir = workflow.cp.get("workflow",'pipedown-log-path')
pipedown_tmp_space = workflow.cp.get("workflow",'pipedown-tmp-space')

pipe_plot_nodes = []
for cat in veto_cats:
    veto_tag = "CUMULATIVE_CAT_%d" %(cat,)
    inputFiles = postProcFiles.find_output_with_tag(veto_tag)
    assert len(inputFiles) == 1
    inputFile = inputFiles[0]
    pipeCommand  = [pipeCp.get("condor","pipedown_plots")]
    pipeCommand.extend(["--tmp-space", pipedown_tmp_space])
    namingPrefix = "FULL_DATA_CAT_%d_VETO" %(cat,)
    pipeCommand.extend(["--naming-prefix", namingPrefix])
    pipeCommand.extend(["--instruments"] + workflow.ifos)
    pipeCommand.extend(["--gps-start-time", str(start_time)])
    pipeCommand.extend(["--gps-end-time", str(end_time)])
    pipeCommand.extend(["--input-file", inputFile.storage_path])
    pipeCommand.extend(["--ihope-cache", cacheFileName])
    pipeCommand.extend(["--simulation-tags"] + inj_tags)
    pipeCommand.extend(["--veto-category", str(cat)])
    pipeCommand.extend(["--log-path", pipedown_log_dir])
    pipeCommand.extend(["--config-file", iniFile])

    # run pipedown_plots
    _workflow.make_external_call(pipeCommand, out_dir=pipedownPlotDir + "/logs",
                       out_basename='pipedown_plots_call')
    pipePlotDag = iniFile[0:-4] + "_" + namingPrefix + ".dag"
    pipePlot_lfn = os.path.basename(pipePlotDag)
    pipePlotNode = dax.DAG(pipePlot_lfn)

    post_workflow._adag.addDAG(pipePlotNode)

    subdag_file = dax.File(pipePlot_lfn)
    subdag_file.PFN(pipePlotDag, site='local')
    post_workflow._adag.addFile(subdag_file)
    pipePlotNode.addProfile(dax.Profile("dagman", "DIR", pipedownPlotDir))
    pipe_plot_nodes.append(pipePlotNode)


# return to the original directory
os.chdir("..")

# Setup for write_ihope_page

# Need to make an altered .ini file, start with the pipedown_plots 
# .ini as it's closer
wipCp = copy.deepcopy(pipeCp)
wipCp.add_section('segments')
wipCp.set('segments', 'veto-categories', veto_cats_string)
# Put the veto-definer in the expected location
vetoFile = wipCp.get('workflow-segments', 'segments-veto-definer-file')
vetoFileBase = os.path.basename(vetoFile)
# This line no longer needed as segment file is already there
#shutil.copyfile(vetoFile, os.path.join(currDir,'segments',vetoFileBase))
wipCp.set('segments', 'veto-def-file', vetoFileBase)
# Set the injection information
wipCp.remove_section('injections')
wipCp.add_section('injections')
for tag in inj_tags:
    wipCp.set('injections', tag.lower(), '')
# Write this ini file out
wipCp.write(file('workflow_config_wip.ini', 'w'))

# Need a second ini file with wip commands
wipConf = ConfigParser.ConfigParser()
wipConf.add_section('main')
wipConf.set('main', 'gps-start-time', str(start_time))
wipConf.set('main', 'gps-end-time', str(end_time))
wipConf.set('main', 'lead', 'Dotty Wot')
wipConf.set('main', 'second', 'Spotty Wot')
wipConf.set('main', 'title', 'Pycbc coincidence analysis')
wipConf.set('main', 'notes', '')
wipConf.set('main', 'ihope-ini-file', 'workflow_config_wip.ini')
wipConf.set('main', 'ihope-directory', baseDir)
htmlOutDir = workflow.cp.get('workflow', 'workflow-html-basedir')
wipConf.set('main', 'html-directory', htmlOutDir)
# Here, for now, use installed version
wipConf.set('main', 'style', '/usr/share/lalapps/write_ihope_style.css')
wipConf.set('main', 'output', 'index.html')
wipConf.write(file('wip.ini', 'w'))

# Now add command to workflow
wip_exe = _workflow.Executable(workflow.cp, 'write_ihope_page')
wip_node = _workflow.Node(wip_exe)
wip_node.add_opt('--config-file', os.path.join(currDir, 'wip.ini'))

post_workflow.add_node(wip_node)
for node in pipe_plot_nodes:
    dep = dax.Dependency(parent=node, child=wip_node._dax_node)
    post_workflow._adag.addDependency(dep)

container += workflow
container += post_workflow

dep = dax.Dependency(parent=workflow.as_job, child=post_workflow.as_job)
container._adag.addDependency(dep)

container.save()

logging.info("Written dax.")
back to top