Revision cd11b5a49b4dbd8cde21d187c518d6aab11e6843 authored by juan.calderonbustillo on 11 April 2017, 13:57:25 UTC, committed by Ian Harry on 19 September 2017, 12:50:07 UTC
1 parent a09371a
Raw File
pycbc_make_psd_estimation_workflow
#!/usr/bin/env python

# Copyright (C) 2015 Tito Dal Canton
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Program for setting up a workflow which estimates the average PSD of a given
portion of strain data."""

import pycbc
import pycbc.version
import pycbc.workflow
import os.path
import argparse
import logging
import pycbc_glue.segments
import datetime
import lal, sys
from pycbc.results import save_fig_with_metadata, two_column_layout

logging.basicConfig(format='%(asctime)s:%(levelname)s : %(message)s',
                    level=logging.INFO)

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--version', action='version',
                    version=pycbc.version.git_verbose_msg)
parser.add_argument('--workflow-name', required=True)
parser.add_argument("-d", "--output-dir", required=True,
                    help="Path to output directory.")
pycbc.workflow.add_workflow_command_line_group(parser)
args = parser.parse_args()

container = pycbc.workflow.Workflow(args, args.workflow_name)
workflow = pycbc.workflow.Workflow(args, args.workflow_name + '-main')
finalize_workflow = pycbc.workflow.Workflow(args, args.workflow_name + '-finalization')

pycbc.workflow.makedir(args.output_dir)
os.chdir(args.output_dir)

# layout list
result_plots = []

# put start / end time at top of summary page
time = workflow.analysis_time
s, e = int(time[0]), int(time[1])
s_utc = str(datetime.datetime(*lal.GPSToUTC(s)[0:6]))
e_utc = str(datetime.datetime(*lal.GPSToUTC(e)[0:6]))
time_str = '<center><p><b>GPS Interval [%s,%s). UTC Interval %s - %s. Interval duration = %.3f days.</b></p></center>' % (s, e, s_utc, e_utc, float(e-s)/86400.0)
time_file = pycbc.workflow.File(workflow.ifos, 'time', workflow.analysis_time,
                                           extension='.html',
                                           directory='plots')
pycbc.workflow.makedir('plots')
kwds = { 'title' : 'Search Workflow Duration (Wall Clock Time)', 
        'caption' : "Wall clock start and end times for this invocation of the workflow. "
                    " The command line button shows the arguments used to invoke the workflow "
                    " creation script.",
        'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(time_str, time_file.storage_path, **kwds)
result_plots += [(time_file,)]

# Get segments and find where the data is
seg_dir = 'segments'

veto_cat_files = pycbc.workflow.get_files_for_vetoes(
        workflow, seg_dir, runtime_names=['segments-science-veto'])

science_seg_file, science_segs, _ = \
        pycbc.workflow.get_science_segments(workflow, seg_dir)

sci_ok_seg_file, science_ok_segs, _ = \
        pycbc.workflow.get_analyzable_segments(workflow, science_segs,
                                                veto_cat_files, seg_dir)

datafind_files, analyzable_file, analyzable_segs, analyzable_name = \
        pycbc.workflow.setup_datafind_workflow(workflow, science_ok_segs,
                                               'datafind',
                                               seg_file=science_seg_file)

psd_job_length = int(workflow.cp.get('workflow-matchedfilter', 'analysis-length'))
pad_data = int(workflow.cp.get('calculate_psd', 'pad-data'))
max_segs_per_job = int(workflow.cp.get('workflow-matchedfilter', 'max-segments-per-job'))

# calculate noise PSDs over analyzable segments
psd_files = {}
for ifo, segments in analyzable_segs.items():
    # split long segments into short ones to use in pycbc_calculate_psd
    # and group short segments in batches of max_segs_per_job
    # FIXME use the same algorithm already in place for inspiral jobs
    batch = []
    batches = []
    for seg in segments:
        start_time = seg[0] + pad_data
        while start_time + psd_job_length + pad_data <= seg[1]:
            end_time = start_time + psd_job_length
            s = pycbc_glue.segments.segment(start_time, end_time)
            batch.append(s)
            start_time = end_time

            if len(batch) >= max_segs_per_job:
                batches.append(batch)
                batch = []

    # final partial batch
    if len(batch) > 0:
        batches.append(batch)

    # each batch goes into a PSD estimation job
    for job_index, job_segs in enumerate(batches):
        tag = 'PART%.4d' % job_index
        name = 'ANALYZABLE_SPLIT_' + tag
        job_segs_file = pycbc.workflow.SegFile.from_segment_list(
                name, job_segs, name, ifo,
                valid_segment=workflow.analysis_time, extension='xml',
                directory=seg_dir)

        ifo_psd_file = pycbc.workflow.make_psd_file(
                workflow, datafind_files.find_output_with_ifo(ifo),
                job_segs_file, name, 'psds', tags=[tag])

        if ifo not in psd_files:
            psd_files[ifo] = []
        psd_files[ifo].append(ifo_psd_file)

    flat_split_segments = pycbc_glue.segments.segmentlist(
            [s for batch in batches for s in batch])
    logging.info('%.1f s of analyzable %s data reduced to %.1f s after '
                 'segmentation', abs(segments), ifo, abs(flat_split_segments))

# merge all PSDs for each detector
merged_psd_files = []
for ifo, ifo_psd_files in psd_files.items():
    merged_psd_file = pycbc.workflow.merge_psds(workflow, ifo_psd_files, ifo,
                                                'psds')
    merged_psd_files.append(merged_psd_file)

# average noise PSDs and save to .txt and .xml
pycbc.workflow.make_average_psd(workflow, merged_psd_files, 'psds',
                                output_fmt='.txt')
pycbc.workflow.make_average_psd(workflow, merged_psd_files, 'psds',
                                output_fmt='.xml.gz')

s = pycbc.workflow.make_spectrum_plot(workflow, merged_psd_files, 'plots')
result_plots += [(s,)]

pycbc.workflow.make_segments_plot(workflow,
                                  pycbc.workflow.FileList([science_seg_file]),
                                  'plots', tags=['SCIENCE_MINUS_CAT1'])

# get data segments to write to segment summary XML file
seg_summ_names    = ['DATA', 'SCIENCE_OK', 'ANALYZABLE_DATA']
seg_summ_seglists = [science_segs, science_ok_segs, analyzable_segs]

# write segment summary XML file
seg_list = []
names = []
ifos = []
for segment_list,segment_name in zip(seg_summ_seglists, seg_summ_names):
    for ifo in workflow.ifos:
        seg_list.append(segment_list[ifo])
        names.append(segment_name)
        ifos.append(ifo)
seg_summ_file = pycbc.workflow.SegFile.from_multi_segment_list(
                   'WORKFLOW_SEGMENT_SUMMARY', seg_list, names, ifos,
                   valid_segment=workflow.analysis_time, extension='xml',
                   directory=seg_dir)

# make segment table for summary page
seg_summ_table = pycbc.workflow.make_seg_table(workflow, [seg_summ_file],
        seg_summ_names, 'plots', ['SUMMARY'],
        title_text='Input and output time',
        description='This shows the total amount of input data, analyzable data, and the time for which PSDs are produced.')
result_plots += [(seg_summ_table,)]

two_column_layout('plots', result_plots)

pycbc.workflow.make_results_web_page(finalize_workflow,
                                     os.path.join(os.getcwd(),
                                     'plots'))

container += workflow
container += finalize_workflow

import Pegasus.DAX3 as dax
dep = dax.Dependency(parent=workflow.as_job, child=finalize_workflow.as_job)
container._adag.addDependency(dep)

container.save()

logging.info("Done")
back to top