Revision bb57a39edbb8e0a66ac0d9b514eaa41afaa2a5ca authored by Ian Harry on 17 June 2019, 14:58:05 UTC, committed by GitHub on 17 June 2019, 14:58:05 UTC
1 parent 6e86b2c
pycbc_make_coinc_search_workflow
#!/usr/bin/env python
# Copyright (C) 2013 Ian W. Harry, Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Program for running multi-detector workflow analysis through coincidence and
then generate post-processing and plots.
"""
import pycbc
import pycbc.version
__author__ = "Alex Nitz <alex.nitz@ligo.org>"
__version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_make_coinc_search_workflow"
import sys
import socket
import pycbc.events, pycbc.workflow as wf
import os, argparse, logging
from six.moves import configparser as ConfigParser
from ligo import segments
import numpy, lal, datetime
from pycbc.results import create_versioning_page, static_table, layout
from pycbc.results.versioning import save_fig_with_metadata
from pycbc.results.metadata import html_escape
def symlink_path(f, path):
if f is None:
return
try:
os.symlink(f.storage_path, os.path.join(path, f.name))
except OSError:
pass
def symlink_result(f, rdir_path):
symlink_path(f, rdir[rdir_path])
# Log to the screen until we know where the output file is
logging.basicConfig(format='%(asctime)s:%(levelname)s : %(message)s',
level=logging.INFO)
parser = argparse.ArgumentParser(description=__doc__[1:])
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--workflow-name', default='my_unamed_run')
parser.add_argument("-d", "--output-dir", default=None,
help="Path to output directory.")
wf.add_workflow_command_line_group(parser)
args = parser.parse_args()
wf.makedir(args.output_dir)
container = wf.Workflow(args, args.workflow_name)
workflow = wf.Workflow(args, args.workflow_name + '-main')
finalize_workflow = wf.Workflow(args, args.workflow_name + '-finalization')
os.chdir(args.output_dir)
rdir = layout.SectionNumber('results', ['analysis_time',
'detector_sensitivity',
'single_triggers',
'coincident_triggers',
'injections',
'search_sensitivity',
'open_box_result',
'workflow',
])
wf.makedir(rdir.base)
wf.makedir(rdir['workflow'])
wf_log_file = wf.File(workflow.ifos, 'workflow-log', workflow.analysis_time,
extension='.txt',
directory=rdir['workflow'])
logging.basicConfig(format='%(asctime)s:%(levelname)s : %(message)s',
filename=wf_log_file.storage_path,
level=logging.INFO,
filemode='w')
logfile = logging.FileHandler(filename=wf_log_file.storage_path,mode='w')
logfile.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s:%(levelname)s : %(message)s')
logfile.setFormatter(formatter)
logging.getLogger('').addHandler(logfile)
logging.info("Created log file %s" % wf_log_file.storage_path)
# put start / end time at top of summary page
time = workflow.analysis_time
s, e = int(time[0]), int(time[1])
s_utc = str(datetime.datetime(*lal.GPSToUTC(s)[0:6]))
e_utc = str(datetime.datetime(*lal.GPSToUTC(e)[0:6]))
time_str = '<center><p><b>GPS Interval [%s,%s). ' %(s,e)
time_str += 'UTC Interval %s - %s. ' %(s_utc, e_utc)
time_str += 'Interval duration = %.3f days.</b></p></center>'\
%(float(e-s)/86400.0,)
time_file = wf.File(workflow.ifos, 'time', workflow.analysis_time,
extension='.html',
directory=rdir.base)
kwds = { 'title' : 'Search Workflow Duration (Wall Clock Time)',
'caption' : "Wall clock start and end times for this invocation of "
"the workflow. The command line button shows the "
"arguments used to invoke the workflow creation script.",
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(time_str, time_file.storage_path, **kwds)
# Get segments and find where the data is
sci_seg_name = 'science'
science_seg_file = wf.get_segments_file(workflow, sci_seg_name, 'segments-science',
rdir['analysis_time/segment_data'])
ssegs = {}
for ifo in workflow.ifos:
ssegs[ifo] = science_seg_file.segment_dict["%s:science" % ifo]
datafind_files, analyzable_file, analyzable_segs, analyzable_name = \
wf.setup_datafind_workflow(workflow,
ssegs, "datafind",
seg_file=science_seg_file)
final_veto_name = ['vetoes']
final_veto_file = wf.FileList([wf.get_segments_file(workflow, final_veto_name[0],
'segments-vetoes',
rdir['analysis_time/segment_data'])])
# Precalculated PSDs
precalc_psd_files = wf.setup_psd_workflow(workflow, analyzable_segs,
datafind_files, "psdfiles")
# Template bank stuff
hdfbank = wf.setup_tmpltbank_workflow(workflow, analyzable_segs,
datafind_files, output_dir="bank",
psd_files=precalc_psd_files,
return_format='hdf')
splitbank_files_fd = wf.setup_splittable_workflow(workflow, hdfbank,
out_dir="bank",
tags=['full_data'])
splitbank_files_inj = wf.setup_splittable_workflow(workflow, hdfbank,
out_dir="bank",
tags=['injections'])
bank_plot = [(wf.make_template_plot(workflow, hdfbank[0],
rdir['coincident_triggers']),)]
# setup the injection files
inj_files, inj_tags = wf.setup_injection_workflow(workflow,
output_dir="inj_files")
######################## Setup the FULL DATA run ##############################
tag = output_dir = "full_data"
ctags = [tag, 'full']
# setup the matchedfilter jobs
ind_insps = insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs,
datafind_files, splitbank_files_fd,
output_dir, tags = [tag])
insps = wf.merge_single_detector_hdf_files(workflow, hdfbank[0],
insps, output_dir, tags=[tag])
# setup sngl trigger distribution fitting jobs
# 'statfiles' is list of files used in calculating coinc statistic
statfiles = wf.FileList([])
statfiles += wf.setup_trigger_fitting(workflow, insps, hdfbank,
final_veto_file, final_veto_name)
# setup coinc for the filtering jobs
full_insps = insps
final_bg_files = wf.setup_interval_coinc(workflow, hdfbank, insps, statfiles,
final_veto_file, final_veto_name,
output_dir, tags=ctags)
final_bg_file = final_bg_files[0][0]
bin_files = final_bg_files[0][1]
censored_veto = wf.make_foreground_censored_veto(workflow,
final_bg_file, final_veto_file[0], final_veto_name[0],
'closed_box', 'segments')
closed_snrifar = []
all_snrifar = []
for bg_file, bg_bins in final_bg_files:
for bg_bin in bg_bins:
snrifar = wf.make_snrifar_plot(workflow, bg_bin,
rdir['coincident_triggers'],
closed_box=True, tags=bg_bin.tags + ['closed'])
all_snrifar.append(snrifar)
if bg_file == final_bg_file:
closed_snrifar.append(snrifar)
max_hierarchical_removal = workflow.cp.get('workflow-results',
'max-hierarchical-removal')
results_page = []
results_aux_page = []
closed_box_ifars = []
# create the main results page with the two plots and the table
# if there are bins, then there is a pair of plots for each bin
# with no hierarchical_level specified, these results are the
# final stage of hierarchical removal (i.e. the main result).
for bin_file in bin_files:
# Open box
snrifar = wf.make_snrifar_plot(workflow, bin_file,
rdir['open_box_result'], tags=bin_file.tags)
ratehist = wf.make_snrratehist_plot(workflow, bin_file,
rdir['open_box_result'], tags=bin_file.tags)
results_page += [(snrifar, ratehist)]
# Significance page
snrifar_ifar = wf.make_snrifar_plot(workflow, bin_file,
rdir['open_box_result/significance'],
cumulative=False, tags=bin_file.tags + ['ifar'])
ifar_ob = wf.make_ifar_plot(workflow, bin_file,
rdir['open_box_result/significance'],
tags=bin_file.tags + ['open_box'])
symlink_result(snrifar, 'open_box_result/significance')
symlink_result(ratehist, 'open_box_result/significance')
results_aux_page += [(snrifar, ratehist), (snrifar_ifar, ifar_ob)]
# Closed box
ifar_cb = wf.make_ifar_plot(workflow, bin_file,
rdir['coincident_triggers'],
tags=bin_file.tags + ['closed_box'])
closed_box_ifars.append(ifar_cb)
# Open box table
table = wf.make_foreground_table(workflow, final_bg_file,
hdfbank[0], rdir['open_box_result'], singles=insps,
extension='.html', tags=["html"])
symlink_result(table, 'open_box_result/significance')
# Open box results summary
results_page += [(table,)]
layout.two_column_layout(rdir['open_box_result'], results_page)
# Significance page
results_aux_page += [(table,)]
layout.two_column_layout(rdir['open_box_result/significance'], results_aux_page)
fore_xmlall = wf.make_foreground_table(workflow, final_bg_file,
hdfbank[0], rdir['open_box_result'], singles=insps,
extension='.xml', tags=["xmlall"])
fore_xmlloudest = wf.make_foreground_table(workflow, final_bg_file,
hdfbank[0], rdir['open_box_result'], singles=insps,
extension='.xml', tags=["xmlloudest"])
# Closed box coincident triggers page
layout.group_layout(rdir['coincident_triggers'],
closed_box_ifars + all_snrifar + [bank_plot[0][0]])
# Now make the plots showing each stage of hierarchical removal
for hierarchical_level in range(int(max_hierarchical_removal) + 1):
results_hier_page = []
if hierarchical_level == 0:
hierarchical_level_page = \
'open_box_result/no_events_removed_from_background'
elif hierarchical_level == 1:
hierarchical_level_page = \
'open_box_result/loudest_event_removed_from_background'
else:
hierarchical_level_page = \
'open_box_result/loudest_{}_events_removed_from_background'.format(
hierarchical_level)
logging.info('hlevel plotting: %s', hierarchical_level)
for bin_file in bin_files:
snrifar = wf.make_snrifar_plot(workflow, bin_file,
rdir[hierarchical_level_page], tags=bin_file.tags,
hierarchical_level=hierarchical_level)
ratehist = wf.make_snrratehist_plot(workflow, bin_file,
rdir[hierarchical_level_page], tags=bin_file.tags,
hierarchical_level=hierarchical_level)
snrifar_ifar = wf.make_snrifar_plot(workflow, bin_file,
rdir[hierarchical_level_page],
cumulative=False, tags=bin_file.tags + ['ifar'],
hierarchical_level=hierarchical_level)
ifar_ob = wf.make_ifar_plot(workflow, bin_file,
rdir[hierarchical_level_page],
tags=bin_file.tags + ['open_box'],
hierarchical_level=hierarchical_level)
results_hier_page += [(snrifar, ratehist), (snrifar_ifar, ifar_ob)]
table = wf.make_foreground_table(workflow, final_bg_file,
hdfbank[0], rdir[hierarchical_level_page],
singles=insps,
extension='.html', tags=["html"],
hierarchical_level=hierarchical_level)
results_hier_page += [(table,)]
layout.two_column_layout(rdir[hierarchical_level_page], results_hier_page)
snrchi = wf.make_snrchi_plot(workflow, insps, censored_veto,
'closed_box', rdir['single_triggers'], tags=[tag])
layout.group_layout(rdir['single_triggers'], snrchi)
hist_summ = []
for insp in full_insps:
wf.make_singles_plot(workflow, [insp], hdfbank[0], censored_veto,
'closed_box', rdir['single_triggers/%s_binned_triggers' % insp.ifo],
tags=[tag])
# make non-summary hists using the bank file
# currently, none of these are made
wf.make_single_hist(workflow, insp, censored_veto, 'closed_box',
rdir['single_triggers/%s_trigger_histograms' % insp.ifo],
bank_file=hdfbank[0], exclude='summ', tags=[tag])
# make summary hists for all templates together
# currently, 2 per ifo: snr and newsnr
allhists = wf.make_single_hist(workflow, insp, censored_veto, 'closed_box',
rdir['single_triggers/%s_trigger_histograms' % insp.ifo],
require='summ', tags=[tag])
# make hists of newsnr split up by parameter
# currently, 1 per ifo split by template duration
binhists = wf.make_binned_hist(workflow, insp, final_veto_file[0],
final_veto_name[0], rdir['single_triggers/%s_binned_histograms' %
insp.ifo], hdfbank[0], tags=[tag])
# put raw SNR and binned newsnr hist in summary
hist_summ += list(layout.grouper([allhists[0], binhists[0]], 2))
# Calculate the inspiral psds and make plots
psd_files = []
trig_generated_name = 'TRIGGERS_GENERATED'
trig_generated_segs = {}
data_analysed_name = 'DATA_ANALYSED'
data_analysed_segs = {}
insp_files_seg_dict = segments.segmentlistdict()
for ifo, files in zip(*ind_insps.categorize_by_attr('ifo')):
trig_generated_segs[ifo] = segments.segmentlist([f.segment for f in files])
data_analysed_segs[ifo] = \
segments.segmentlist([f.metadata['data_seg'] for f in files])
# Remove duplicates from splitbank
trig_generated_segs[ifo] = \
segments.segmentlist(set(trig_generated_segs[ifo]))
data_analysed_segs[ifo] = \
segments.segmentlist(set(data_analysed_segs[ifo]))
insp_files_seg_dict[ifo + ":" + trig_generated_name] = \
trig_generated_segs[ifo]
insp_files_seg_dict[ifo + ":" + data_analysed_name] = \
data_analysed_segs[ifo]
if datafind_files:
frame_files = datafind_files.find_output_with_ifo(ifo)
else:
frame_files = None
psd_files += [wf.setup_psd_calculate(workflow, frame_files, ifo,
data_analysed_segs[ifo], data_analysed_name, 'psds')]
insp_files_seg_file = wf.SegFile.from_segment_list_dict('INSP_SEGMENTS',
insp_files_seg_dict, valid_segment=workflow.analysis_time,
extension='xml', directory=rdir['analysis_time/segment_data'])
s = wf.make_spectrum_plot(workflow, psd_files, rdir['detector_sensitivity'],
precalc_psd_files=precalc_psd_files)
r = wf.make_range_plot(workflow, psd_files, rdir['detector_sensitivity'],
require='summ')
r2 = wf.make_range_plot(workflow, psd_files, rdir['detector_sensitivity'],
exclude='summ')
det_summ = [(s, r[0] if len(r) != 0 else None)]
layout.two_column_layout(rdir['detector_sensitivity'],
det_summ + list(layout.grouper(r2, 2)))
# run minifollowups on the output of the loudest events
wf.setup_foreground_minifollowups(workflow, final_bg_file,
full_insps, hdfbank[0], insp_files_seg_file,
data_analysed_name, trig_generated_name, 'daxes',
rdir['open_box_result/loudest_event_followup'],
tags=final_bg_file.tags)
for bin_file in bin_files:
if hasattr(bin_file, 'bin_name'):
currdir = rdir['open_box_result/loudest_in_%s_bin' % bin_file.bin_name]
wf.setup_foreground_minifollowups(workflow, bin_file, full_insps,
hdfbank[0], insp_files_seg_file,
data_analysed_name, trig_generated_name,
'daxes', currdir, tags=bin_file.tags)
# Also run minifollowups on loudest sngl detector events
excl_subsecs = set([])
for insp_file in full_insps:
for tag in insp_file.tags:
excl_subsecs.add(tag)
for insp_file in full_insps:
curr_ifo = insp_file.ifo
for subsec in workflow.cp.get_subsections('workflow-sngl_minifollowups'):
if subsec in excl_subsecs:
continue
sec_name = 'workflow-sngl_minifollowups-{}'.format(subsec)
dir_str = workflow.cp.get(sec_name, 'section-header')
currdir = rdir['single_triggers/{}_{}'.format(curr_ifo, dir_str)]
wf.setup_single_det_minifollowups(workflow, insp_file, hdfbank[0],
insp_files_seg_file, data_analysed_name,
trig_generated_name, 'daxes', currdir,
veto_file=censored_veto,
veto_segment_name='closed_box',
statfiles=statfiles,
tags=insp_file.tags + [subsec])
################## Setup segment / veto related plots #########################
wf.make_segments_plot(workflow, [insp_files_seg_file],
rdir['analysis_time/segments'], tags=[trig_generated_name])
wf.make_gating_plot(workflow, full_insps, rdir['analysis_time/gating'],
tags=['full_data'])
if workflow.cp.has_option_tags('workflow-matchedfilter',
'plot-throughput', tags=[tag]):
wf.make_throughput_plot(workflow, full_insps, rdir['workflow/throughput'],
tags=['full_data'])
# make segment table and plot for summary page
curr_files = [science_seg_file, analyzable_file,
insp_files_seg_file]
curr_names = [sci_seg_name, analyzable_name,
trig_generated_name]
seg_summ_table = wf.make_seg_table(workflow, curr_files, curr_names,
rdir['analysis_time/segments'], ['SUMMARY'],
title_text='Input and output',
description='This shows the total amount of input '
'data, analyzable data, and the time for '
'which triggers are produced.')
seg_summ_plot = wf.make_seg_plot(workflow, curr_files,
rdir['analysis_time/segments'],
curr_names, ['SUMMARY'])
curr_files = [insp_files_seg_file] + final_veto_file
# Add in singular veto files
curr_files = curr_files + [science_seg_file]
curr_names = [trig_generated_name + '&' + veto_name\
for veto_name in final_veto_name]
# And SCIENCE - CAT 1 vetoes explicitly.
curr_names += [sci_seg_name + '&' + 'VETO_CAT1']
veto_summ_table = wf.make_seg_table(workflow, curr_files, curr_names,
rdir['analysis_time/segments'], ['VETO_SUMMARY'],
title_text='Time removed by vetoes',
description='This shows the time removed from the '
'output time by the vetoes applied to the '
'triggers.')
############################## Setup the injection runs #######################
inj_coincs = wf.FileList()
files_for_combined_injfind = []
for inj_file, tag in zip(inj_files, inj_tags):
ctags = [tag, 'inj']
output_dir = '%s_coinc' % tag
if workflow.cp.has_option_tags('workflow-injections',
'compute-optimal-snr', tags=[tag]):
optimal_snr_file = wf.compute_inj_optimal_snr(
workflow, inj_file, psd_files, 'inj_files', tags=[tag])
file_for_injfind = optimal_snr_file
else:
file_for_injfind = inj_file
if workflow.cp.has_option_tags('workflow-injections', 'inj-cut', tags=[tag]):
file_for_vetoes = wf.cut_distant_injections(
workflow, file_for_injfind, 'inj_files', tags=[tag])
else:
file_for_vetoes = inj_file
if workflow.cp.has_option_tags('workflow-injections', 'strip-injections',
tags=[tag]):
small_inj_file = wf.veto_injections(workflow, file_for_vetoes,
insp_files_seg_file, trig_generated_name,
"inj_files", tags=[tag])
else:
small_inj_file = file_for_vetoes
files_for_combined_injfind.append(file_for_injfind)
# setup the matchedfilter jobs
insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs,
datafind_files, splitbank_files_inj,
output_dir, injection_file=small_inj_file,
tags = [tag])
insps = wf.merge_single_detector_hdf_files(workflow, hdfbank[0],
insps, output_dir, tags=[tag])
inj_coinc = wf.setup_interval_coinc_inj(workflow, hdfbank,
full_insps, insps, statfiles, bin_files,
final_veto_file[0], final_veto_name[0],
output_dir, tags = ctags)
found_inj = wf.find_injections_in_hdf_coinc(workflow, wf.FileList([inj_coinc]),
wf.FileList([file_for_injfind]),
final_veto_file[0], final_veto_name[0],
output_dir, tags=ctags)
inj_coincs += [inj_coinc]
#foundmissed/sensitivity plots that go in the subsection wells
s = wf.make_sensitivity_plot(workflow, found_inj,
rdir['search_sensitivity/%s' % tag],
exclude=['all', 'summ'], require='sub', tags=ctags)
f = wf.make_foundmissed_plot(workflow, found_inj,
rdir['injections/%s' % tag],
exclude=['all', 'summ'], require='sub', tags=[tag])
# Extra foundmissed/sensitivity plots
wf.make_sensitivity_plot(workflow, found_inj,
rdir['search_sensitivity/%s' % tag],
exclude=['all', 'summ', 'sub'], tags=ctags)
wf.make_foundmissed_plot(workflow, found_inj,
rdir['injections/%s' % tag],
exclude=['all', 'summ', 'sub'], tags=[tag])
found_table = wf.make_inj_table(workflow, found_inj,
rdir['injections/%s' % tag], singles=insps, tags=[tag + 'found'])
missed_table = wf.make_inj_table(workflow, found_inj,
rdir['injections/%s' % tag], missed=True, tags=[tag + 'missed'])
for inj_insp, trig_insp in zip(insps, full_insps):
f += wf.make_coinc_snrchi_plot(workflow, found_inj, inj_insp,
final_bg_file, trig_insp,
rdir['injections/%s' % tag], tags=[tag])
inj_layout = list(layout.grouper(f, 2)) + [(found_table,), (missed_table,)]
if len(s) > 0: layout.group_layout(rdir['search_sensitivity/%s' % tag], s)
if len(f) > 0: layout.two_column_layout(rdir['injections/%s' % tag], inj_layout)
# run minifollowups on nearest missed injections
curr_dir_nam = 'injections/followup_of_{}'.format(tag)
if workflow.cp.has_option_tags('workflow-injection_minifollowups',
'subsection-suffix', tags=[tag]):
suf_str = workflow.cp.get_opt_tags('workflow-injection_minifollowups',
'subsection-suffix', tags=[tag])
curr_dir_nam += '_' + suf_str
currdir = rdir[curr_dir_nam]
wf.setup_injection_minifollowups(workflow, found_inj, small_inj_file,
insps, hdfbank[0], insp_files_seg_file,
data_analysed_name, trig_generated_name,
'daxes', currdir, tags=[tag])
# If option given, make throughput plots
if workflow.cp.has_option_tags('workflow-matchedfilter',
'plot-throughput', tags=[tag]):
wf.make_throughput_plot(workflow, insps, rdir['workflow/throughput'],
tags=[tag])
# Make combined injection plots
inj_summ = []
if len(files_for_combined_injfind) > 0:
found_inj = wf.find_injections_in_hdf_coinc(workflow, inj_coincs,
files_for_combined_injfind, censored_veto,
'closed_box', 'allinj', tags=['ALLINJ'])
sen = wf.make_sensitivity_plot(workflow, found_inj, rdir['search_sensitivity'],
require='all', tags=['ALLINJ'])
layout.group_layout(rdir['search_sensitivity'], sen)
inj = wf.make_foundmissed_plot(workflow, found_inj, rdir['injections'],
require='all', tags=['ALLINJ'])
layout.group_layout(rdir['injections'], inj)
# Make summary page foundmissed and sensitivity plot
sen = wf.make_sensitivity_plot(workflow, found_inj,
rdir['search_sensitivity'], require='summ', tags=['ALLINJ'])
inj = wf.make_foundmissed_plot(workflow, found_inj,
rdir['injections'], require='summ', tags=['ALLINJ'])
inj_summ = list(layout.grouper(inj + sen, 2))
# make analysis time summary
analysis_time_summ = [time_file, seg_summ_plot]
for f in analysis_time_summ:
symlink_result(f, 'analysis_time')
layout.single_layout(rdir['analysis_time'], (analysis_time_summ))
# make full summary
summ = ([(time_file,)] + [(seg_summ_plot,)] +
[(seg_summ_table, veto_summ_table)] + det_summ + hist_summ +
bank_plot + list(layout.grouper(closed_snrifar, 2)) + inj_summ)
for row in summ:
for f in row:
symlink_path(f, rdir.base)
layout.two_column_layout(rdir.base, summ)
# save global config file to results directory
base = rdir['workflow/configuration']
wf.makedir(base)
ini_file_path = os.path.join(base, 'configuration.ini')
with open(ini_file_path, 'w') as ini_fh:
container.cp.write(ini_fh)
ini_file = wf.FileList([wf.File(workflow.ifos, '', workflow.analysis_time,
file_url='file://'+ini_file_path)])
layout.single_layout(base, ini_file)
# Create versioning information
create_versioning_page(rdir['workflow/version'], container.cp)
# Create the final log file
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html',
directory=rdir['workflow'])
# Create a page to contain a dashboard link
dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html',
directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = { 'title' : 'Pegasus Dashboard',
'caption' : "Link to Pegasus Dashboard",
'cmd' : "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)
# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])
wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(),
rdir.base))
container += workflow
container += finalize_workflow
import Pegasus.DAX3 as dax
dep = dax.Dependency(parent=workflow.as_job, child=finalize_workflow.as_job)
container._adag.addDependency(dep)
container.save()
# Protect the open box results folder
os.chmod(rdir['open_box_result'], 0o0700)
logging.info("Written dax.")
# Close the log and flush to the html file
logging.shutdown()
with open (wf_log_file.storage_path, "r") as logfile:
logdata=logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = { 'title' : 'Workflow Generation Log',
'caption' : "Log of the workflow script %s" % sys.argv[0],
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file,log_file_html]))
Computing file changes ...