#!/usr/bin/env python # Copyright (C) 2015 Tito Dal Canton # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """Program for setting up a workflow which estimates the average PSD of a given portion of strain data.""" import pycbc import pycbc.version import pycbc.workflow import os.path import argparse import logging import pycbc_glue.segments import datetime import lal, sys from pycbc.results import save_fig_with_metadata, two_column_layout logging.basicConfig(format='%(asctime)s:%(levelname)s : %(message)s', level=logging.INFO) parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--version', action='version', version=pycbc.version.git_verbose_msg) parser.add_argument('--workflow-name', required=True) parser.add_argument("-d", "--output-dir", required=True, help="Path to output directory.") pycbc.workflow.add_workflow_command_line_group(parser) args = parser.parse_args() container = pycbc.workflow.Workflow(args, args.workflow_name) workflow = pycbc.workflow.Workflow(args, args.workflow_name + '-main') finalize_workflow = pycbc.workflow.Workflow(args, args.workflow_name + '-finalization') pycbc.workflow.makedir(args.output_dir) os.chdir(args.output_dir) # layout list result_plots = [] # put start / end time at top of summary page time = workflow.analysis_time s, e = int(time[0]), int(time[1]) s_utc = str(datetime.datetime(*lal.GPSToUTC(s)[0:6])) e_utc = str(datetime.datetime(*lal.GPSToUTC(e)[0:6])) time_str = '

GPS Interval [%s,%s). UTC Interval %s - %s. Interval duration = %.3f days.

' % (s, e, s_utc, e_utc, float(e-s)/86400.0) time_file = pycbc.workflow.File(workflow.ifos, 'time', workflow.analysis_time, extension='.html', directory='plots') pycbc.workflow.makedir('plots') kwds = { 'title' : 'Search Workflow Duration (Wall Clock Time)', 'caption' : "Wall clock start and end times for this invocation of the workflow. " " The command line button shows the arguments used to invoke the workflow " " creation script.", 'cmd' :' '.join(sys.argv), } save_fig_with_metadata(time_str, time_file.storage_path, **kwds) result_plots += [(time_file,)] # Get segments and find where the data is seg_dir = 'segments' veto_cat_files = pycbc.workflow.get_files_for_vetoes( workflow, seg_dir, runtime_names=['segments-science-veto']) science_seg_file, science_segs, _ = \ pycbc.workflow.get_science_segments(workflow, seg_dir) sci_ok_seg_file, science_ok_segs, _ = \ pycbc.workflow.get_analyzable_segments(workflow, science_segs, veto_cat_files, seg_dir) datafind_files, analyzable_file, analyzable_segs, analyzable_name = \ pycbc.workflow.setup_datafind_workflow(workflow, science_ok_segs, 'datafind', seg_file=science_seg_file) psd_job_length = int(workflow.cp.get('workflow-matchedfilter', 'analysis-length')) pad_data = int(workflow.cp.get('calculate_psd', 'pad-data')) max_segs_per_job = int(workflow.cp.get('workflow-matchedfilter', 'max-segments-per-job')) # calculate noise PSDs over analyzable segments psd_files = {} for ifo, segments in analyzable_segs.items(): # split long segments into short ones to use in pycbc_calculate_psd # and group short segments in batches of max_segs_per_job # FIXME use the same algorithm already in place for inspiral jobs batch = [] batches = [] for seg in segments: start_time = seg[0] + pad_data while start_time + psd_job_length + pad_data <= seg[1]: end_time = start_time + psd_job_length s = pycbc_glue.segments.segment(start_time, end_time) batch.append(s) start_time = end_time if len(batch) >= max_segs_per_job: batches.append(batch) batch = [] # final partial batch if len(batch) > 0: batches.append(batch) # each batch goes into a PSD estimation job for job_index, job_segs in enumerate(batches): tag = 'PART%.4d' % job_index name = 'ANALYZABLE_SPLIT_' + tag job_segs_file = pycbc.workflow.SegFile.from_segment_list( name, job_segs, name, ifo, valid_segment=workflow.analysis_time, extension='xml', directory=seg_dir) ifo_psd_file = pycbc.workflow.make_psd_file( workflow, datafind_files.find_output_with_ifo(ifo), job_segs_file, name, 'psds', tags=[tag]) if ifo not in psd_files: psd_files[ifo] = [] psd_files[ifo].append(ifo_psd_file) flat_split_segments = pycbc_glue.segments.segmentlist( [s for batch in batches for s in batch]) logging.info('%.1f s of analyzable %s data reduced to %.1f s after ' 'segmentation', abs(segments), ifo, abs(flat_split_segments)) # merge all PSDs for each detector merged_psd_files = [] for ifo, ifo_psd_files in psd_files.items(): merged_psd_file = pycbc.workflow.merge_psds(workflow, ifo_psd_files, ifo, 'psds') merged_psd_files.append(merged_psd_file) # average noise PSDs and save to .txt and .xml pycbc.workflow.make_average_psd(workflow, merged_psd_files, 'psds', output_fmt='.txt') pycbc.workflow.make_average_psd(workflow, merged_psd_files, 'psds', output_fmt='.xml.gz') s = pycbc.workflow.make_spectrum_plot(workflow, merged_psd_files, 'plots') result_plots += [(s,)] pycbc.workflow.make_segments_plot(workflow, pycbc.workflow.FileList([science_seg_file]), 'plots', tags=['SCIENCE_MINUS_CAT1']) # get data segments to write to segment summary XML file seg_summ_names = ['DATA', 'SCIENCE_OK', 'ANALYZABLE_DATA'] seg_summ_seglists = [science_segs, science_ok_segs, analyzable_segs] # write segment summary XML file seg_list = [] names = [] ifos = [] for segment_list,segment_name in zip(seg_summ_seglists, seg_summ_names): for ifo in workflow.ifos: seg_list.append(segment_list[ifo]) names.append(segment_name) ifos.append(ifo) seg_summ_file = pycbc.workflow.SegFile.from_multi_segment_list( 'WORKFLOW_SEGMENT_SUMMARY', seg_list, names, ifos, valid_segment=workflow.analysis_time, extension='xml', directory=seg_dir) # make segment table for summary page seg_summ_table = pycbc.workflow.make_seg_table(workflow, [seg_summ_file], seg_summ_names, 'plots', ['SUMMARY'], title_text='Input and output time', description='This shows the total amount of input data, analyzable data, and the time for which PSDs are produced.') result_plots += [(seg_summ_table,)] two_column_layout('plots', result_plots) pycbc.workflow.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(), 'plots')) container += workflow container += finalize_workflow import Pegasus.DAX3 as dax dep = dax.Dependency(parent=workflow.as_job, child=finalize_workflow.as_job) container._adag.addDependency(dep) container.save() logging.info("Done")