Revision 28fa8ecbda12b520d1570fd3f6988ea156fc962b authored by Duncan Brown on 27 September 2015, 12:25:14 UTC, committed by Duncan Brown on 27 September 2015, 12:25:14 UTC
Fix for case where workflow doesn't have a psd section
2 parent s c790a8a + 2768ea8
Raw File
pycbc_pipedown_plots
#!/usr/bin/env python

# Copyright (C) 2013 Ian W. Harry and Collin Capano
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
#

"""
Program for generating a dag to make all the plots that were previously made as
part of pipedown.
"""

import pycbc
import pycbc.version
__author__ = 'Collin Capano <collin.capano@ligo.org>'
__version__ = pycbc.version.git_verbose_msg
__date__    = pycbc.version.date
__program__ = "pycbc_pipedown_plots"

##############################################################################
# import standard modules and append the lalapps prefix to the python path
import os, sys, re, copy
import argparse
import tempfile

##############################################################################
# import the modules we need to build the pipeline
from glue import pipeline
from glue import iterutils
from glue.ligolw import ligolw
from glue.ligolw import lsctables
from glue.ligolw import utils
from glue.ligolw.utils import process
from glue import git_version

##############################################################################
#
#  MAIN PROGRAM
#
##############################################################################

##############################################################################
# parse command-line arguments
#

_desc = __doc__[1:]
parser = argparse.ArgumentParser(description=_desc)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--tmp-space', action="store", required=True,
                    metavar='TMP_PATH', help="""Path to a temporary location on
                    each of the compute nodes that will be used to copy the SQL
                    database to and then run operations on.""")
parser.add_argument('--naming-prefix', action="store", required=True,
                    metavar="NAME", help="""TAG to be used in naming the output
                    files.""")
parser.add_argument('--instruments', nargs="+", required=True, action="store",
                    metavar="X1", help="""Space separated list of all ifos
                    that are present in the database, e.g. H1 L1 V1.""")
parser.add_argument('--gps-start-time', action="store", required=True,
                    metavar="TIME", type=int, help="""Start time. Used for file
                    naming.""")
parser.add_argument("--gps-end-time", action="store", required=True,
                    metavar="TIME", type=int, help="""End time. Used for file
                    naming.""")
parser.add_argument("--input-file", action="store", required=True,
                    metavar="INFILE", help="""Path to the SQL database storing
                    the triggers with FARs and durations computed.""")
parser.add_argument("--ihope-cache", action="store", required=True,
                    metavar="CACHEFILE", help="""Location of the ihope cache
                    file so that minifollowups can find earlier input files.""") 
parser.add_argument("--simulation-tags", nargs="*", required=False,
                    action="store", metavar="SIMTAG", help="""The list of
                    simulation tags. This must correspond to the list in the
                    SQL database.""", default=None)
parser.add_argument("--veto-category", type=int, action="store", metavar="INT", 
                    help="""The veto category as an integer.""")
parser.add_argument("--log-path", action="store", metavar="PATH",
                    help="""Directory on submit node on which to place the
                    various temporary condor products.""")
parser.add_argument("--config-file", action="store", metavar="CONFIGFILE",
                    help="""The configuration file containing instructions
                    about how to set up the various plotting codes.""")
args = parser.parse_args()

from lalapps import inspiral

##############################################################################
# Create log file
logdoc = ligolw.Document()
logdoc.appendChild(ligolw.LIGO_LW())
proc_id = process.register_to_xmldoc(logdoc, __program__, args.__dict__,
                                     version = git_version.id)

##############################################################################
# parse the ini file and initialize
cp = pipeline.DeepCopyableConfigParser()
cp.read(args.config_file)
time_column = cp.get('pipeline', 'time-column')

tmp_space = args.tmp_space
type_prefix = args.naming_prefix
instruments = args.instruments
experiment_start = str(args.gps_start_time)
experiment_duration = str(int(args.gps_end_time) - int(args.gps_start_time))
input_database = args.input_file
ihope_cachefile = args.ihope_cache
if args.simulation_tags:
  sim_tags = args.simulation_tags + ["ALLINJ"]
else:
  sim_tags = []
veto_level = args.veto_category
veto_tag = "CAT_%d_VETO" %(veto_level,)

##############################################################################
# create a log file that the Condor jobs will write to
basename = re.sub(r'\.ini', r'', os.path.basename(args.config_file))
basename = basename + '_' + args.naming_prefix

# FIXME: Add an output directory option

tempfile.tempdir = args.log_path
tempfile.template = '.'.join([ basename, 'dag.log.' ])
logfile = tempfile.mktemp()
fh = open( logfile, "w" )
fh.close()
if not os.path.exists('logs'):
    os.makedirs('logs')

##############################################################################
# create the DAG writing the log to the specified directory
dag = pipeline.CondorDAG(logfile)
dag.set_dag_file(basename)

##############################################################################
# since some files won't be passed through minifollowups, need two sets of
# printsims and printmissed jobs, one with columns, one without
plotslides_job = inspiral.PlotSlidesJob(cp)
plotcumhist_job = inspiral.PlotCumhistJob(cp)

printsims_job = inspiral.LigolwCBCPrintJob(cp, 'printsims', ['printsims'])
printmissed_job = inspiral.LigolwCBCPrintJob(cp, 'printmissed', ['printmissed'])
# time-slide minifollowup jobs require a separate time-slides argument
# and missed injection minifollowups don't use full xml files
minifup_job = inspiral.MiniFollowupsJob(cp)
minifup_ts_job = inspiral.MiniFollowupsJob(cp)
minifup_ts_job.set_time_slides()
minifup_cm_job = inspiral.MiniFollowupsJob(cp)
search_volume_job = inspiral.SearchVolumeJob(cp)
search_upper_limit_job = inspiral.SearchUpperLimitJob(cp)


# the number of and options given to plotfm job is optional and is set by the ini file
# to handle this, we do a similar thing as is done in ihope with inspinj: the 'plot_found/missed'
# section in the ini file gives the names of sections to read for plotfm jobs; these sections are cycled over
# and temporarily re-named to 'plotfm' then passed to the PlotFMJob class to create the job
# FIXME: The tag system was introduced in ahope to deal with this case. If we
#        move this code over to use the pycbc workflow setup, please use the
#        tag system and not this!
plotfm_jobs = []
if cp.has_section('plotfm'):
  for plotfm_tag in cp.options('plotfm'):
    # create a copy of the config parser, blank out the plotfm section, and replace
    # with a new plotfm section with the options taken from plotfm_tag section
    fmcp = copy.deepcopy(cp)
    fmcp.remove_section('plotfm')
    fmcp.add_section('plotfm')
    for fmopt, fmval in cp.items(plotfm_tag):
      fmcp.set('plotfm', fmopt, fmval)
    plotfm_jobs.append( (plotfm_tag, inspiral.PlotFMJob(fmcp)) )

printlc_job = inspiral.LigolwCBCPrintJob(cp, 'printlc', ['printlc'])
plotifar_job = inspiral.PlotIfarJob(cp)

printlc_job.set_sub_file( '.'.join([ basename, 'printlc', 'sub' ]) )
printsims_job.set_sub_file( '.'.join([ basename, 'printsims', 'sub' ]) )
printmissed_job.set_sub_file( '.'.join([ basename, 'printmissed', 'sub' ]) )
minifup_job.set_sub_file( '.'.join([ basename, 'minifollowups', 'sub' ]) )
minifup_ts_job.set_sub_file( '.'.join([ basename, 'time_slides', 'minifollowups', 'sub' ]) )
minifup_cm_job.set_sub_file( '.'.join([ basename, 'closest_missed', 'minifollowups', 'sub' ]) )
plotifar_job.set_sub_file( '.'.join([ basename, 'plotifar', 'sub' ]) )
for fmtag, fmjob in plotfm_jobs:
  fmjob.set_sub_file( '.'.join([ basename, fmtag, 'plotfm', 'sub' ]) )
plotslides_job.set_sub_file( '.'.join([ basename, 'plotslides', 'sub' ]) )
plotcumhist_job.set_sub_file( '.'.join([ basename, 'plotcumhist', 'sub' ]) )
search_volume_job.set_sub_file( '.'.join([ basename, 'search_volume', 'sub' ]) )
search_upper_limit_job.set_sub_file( '.'.join([ basename, 'search_upper_limit', 'sub' ]) )

# set memory requirments for memory intensive jobs
printlc_job.add_condor_cmd("request_memory", "1000")
printsims_job.add_condor_cmd("request_memory", "1000")
minifup_job.add_condor_cmd("request_memory", "2000")
minifup_ts_job.add_condor_cmd("request_memory", "2000")
minifup_cm_job.add_condor_cmd("request_memory", "2000")

############################################################################
# UPPERLIMIT CALCULATIONS
############################################################################

tag = "FULL_DATA_" + veto_tag
weird_veto_string = "VETO_CAT"+str(veto_level)+"_CUMULATIVE"
output_cache_name = ''.join(sorted(instruments)) + "-SEARCH_VOLUME_" + tag
output_cache_name += ".cache"

search_volume_node = inspiral.SearchVolumeNode( search_volume_job )
search_volume_node.set_open_box()
search_volume_node.add_database( input_database )
search_volume_node.set_user_tag( tag )
search_volume_node.set_output_cache( output_cache_name )
search_volume_node.set_veto_segments_name( weird_veto_string )

search_upper_limit_node = inspiral.SearchUpperLimitNode(search_upper_limit_job)
search_upper_limit_node.set_open_box()
search_upper_limit_node.add_input_cache( output_cache_name )
search_upper_limit_node.set_user_tag( tag )
search_upper_limit_node.add_parent( search_volume_node )
dag.add_node( search_volume_node )
dag.add_node( search_upper_limit_node )

# make corresponding jobs for playground (using FAR of expected loudest event)
playtag = tag.replace("FULL_DATA","PLAYGROUND")
search_volume_node = inspiral.SearchVolumeNode( search_volume_job )
output_cache_name = output_cache_name.replace("FULL_DATA","PLAYGROUND")
search_volume_node.add_database( input_database )
search_volume_node.set_user_tag( playtag )
search_volume_node.set_output_cache( output_cache_name )
search_volume_node.set_veto_segments_name( weird_veto_string )

search_upper_limit_node = inspiral.SearchUpperLimitNode(search_upper_limit_job)
search_upper_limit_node.add_input_cache( output_cache_name )
search_upper_limit_node.set_user_tag( playtag )
search_upper_limit_node.add_parent( search_volume_node )
dag.add_node( search_volume_node )
dag.add_node( search_upper_limit_node )

############################################################################
# Summary: Setup PrintLC and MiniFollowup Nodes to generate a summary of 
# loudest non-simulation events

print "\tsetting up printlc and minifollowup nodes..."

# set datatypes to generate files for
datatypes = ['all_data', 'playground', 'slide']

distinct_instrument_sets = [ set(sub_combo)
    for nn in range(2, len(instruments)+1)
    for sub_combo in iterutils.choices( list(instruments), nn ) ]

for datatype in datatypes:
  print "\t\tfor %s..." % datatype
  # set file naming type
  type = '_'.join([ type_prefix, 'LOUDEST', datatype.upper(), 'EVENTS_BY', cp.get('printlc', 'ranking-stat').upper()])
  # cycle over all ifos times, creating different tables for each
  for on_instruments in distinct_instrument_sets:
    on_instruments = lsctables.ifos_from_instrument_set(on_instruments)
    # set output and extracted xml file names
    if cp.has_option('printlc', 'output-format'):
      summ_file_type = cp.get('printlc', 'output-format')
    else:
      summ_file_type = 'xml'
    summary_filename = '.'.join([
        '-'.join([ ''.join(on_instruments.split(',')), type + '_SUMMARY', experiment_start, experiment_duration ]),
        summ_file_type ])
    full_filename = '-'.join([ ''.join(on_instruments.split(',')), type, experiment_start, experiment_duration ])
    # set node options
    printlc_node = inspiral.PrintLCNode( printlc_job )
    printlc_node.set_category('printlc')
    printlc_node.set_tmp_space( tmp_space )
    printlc_node.set_input( input_database )
    printlc_node.set_output( summary_filename )
    printlc_node.set_extract_to_xml( '.'.join([ full_filename, 'xml' ]) )
    printlc_node.set_extract_to_database( '.'.join([ full_filename, 'sqlite' ]) )
    printlc_node.set_include_only_coincs( '[ALLin' + on_instruments + ']' )
    printlc_node.set_datatype( datatype )
    printlc_node.add_var_opt('time-column',time_column)

    dag.add_node( printlc_node )

    # create the minifollowups nodes
    prefix = '-'.join([ ''.join(on_instruments.split(',')), type_prefix ])
    suffix = re.sub(prefix + '_', '', summary_filename).rstrip('.xml')
  
    # Playground files don't exist before SQL, so no minifollowups
    if datatype == 'slide':
      minifup_node = inspiral.MiniFollowupsNode( minifup_ts_job )
    else:
      minifup_node = inspiral.MiniFollowupsNode( minifup_job )
    minifup_node.set_category('minifollowups')
    minifup_node.set_cache_file( ihope_cachefile )
    minifup_node.set_cache_string( type_prefix.split('_CAT_')[0] )
    minifup_node.set_prefix( prefix )
    minifup_node.set_suffix( suffix )
    minifup_node.set_input_xml( '.'.join([ full_filename, 'xml' ]) )
    minifup_node.set_input_xml_summary( summary_filename )
    minifup_node.set_output_html_table( re.sub('.xml', '.html', summary_filename ) )
    minifup_node.set_table_name( 'loudest_events' )

    minifup_node.add_parent( printlc_node )
    dag.add_node( minifup_node )
      
############################################################################
# Injection summary: Setup printsims, minifollowup, and printmissed nodes

print "\tsetting up injection summary nodes..."

# cycle over all the different types of injections
for sim_tag in sim_tags:
  
  # 
  # Printsims
  #

  # NOTE: Injection plots run using foreground triggers only from playground
  #       whenever relevant.
  datatype = 'playground'

  # set file naming type
  type = '_'.join([ sim_tag, veto_tag, 'QUIETEST_FOUND_COMPARED_TO', datatype.upper(), 'BY', cp.get('printsims', 'ranking-stat').upper()])
  # cycle over all ifos times, creating different tables for each
  for on_instruments in distinct_instrument_sets:
    on_instruments = lsctables.ifos_from_instrument_set(on_instruments)
    if cp.has_option('printsims', 'output-format'):
      summ_file_type = cp.get('printsims', 'output-format')
    else:
      summ_file_type = 'xml'
    summary_filename = '.'.join([
      '-'.join([ ''.join(on_instruments.split(',')), type + '_SUMMARY', experiment_start, experiment_duration ]),
      summ_file_type ])
    full_filename = '-'.join([ ''.join(on_instruments.split(',')), type, experiment_start, experiment_duration ])
    printsims_node = inspiral.PrintSimsNode( printsims_job )
    printsims_node.set_category('printsims')
    printsims_node.set_tmp_space( tmp_space )
    printsims_node.set_input( input_database )
    printsims_node.set_output( summary_filename )
    printsims_node.set_extract_to_xml( '.'.join([ full_filename, 'xml' ]) )
    printsims_node.set_include_only_coincs( '[ALLin' + on_instruments + ']' )
    printsims_node.set_comparison_datatype( datatype )
    printsims_node.set_sim_tag( sim_tag )
    printsims_node.set_output_format( summ_file_type )
    printsims_node.add_var_opt('time-column',time_column)
    
    dag.add_node( printsims_node )
    
    # create the minifollowups nodes
    prefix = '-'.join([ ''.join(on_instruments.split(',')), '_'.join([sim_tag, veto_tag]) ])
    suffix = re.sub(prefix+'_', '', summary_filename).rstrip('.xml')
 
    minifup_node = inspiral.MiniFollowupsNode( minifup_job )
    minifup_node.set_category('minifollowups')
    minifup_node.set_cache_file( ihope_cachefile )
    if sim_tag == 'ALLINJ':
        minifup_node.set_cache_string( 'INJ' )
    else:
        minifup_node.set_cache_string( sim_tag )
    minifup_node.set_prefix( prefix )
    minifup_node.set_suffix( suffix )
    minifup_node.set_input_xml( '.'.join([ full_filename, 'xml' ]) )
    minifup_node.set_input_xml_summary( summary_filename )
    minifup_node.set_output_html_table( re.sub('.xml', '.html', summary_filename ) )
    minifup_node.set_table_name( 'selected_found_injections' )
     
    minifup_node.add_parent( printsims_node )
    dag.add_node( minifup_node )
      
  #
  #  Printmissed
  #
  type = '_'.join([ sim_tag, veto_tag, 'CLOSEST_MISSED_INJECTIONS' ])
  # cycle over all ifos times, creating different tables for each
  for on_instruments in distinct_instrument_sets:
    on_instruments = lsctables.ifos_from_instrument_set(on_instruments)
    # set output and extracted xml file names
    if cp.has_option('printmissed', 'output-format'):
      summ_file_type = cp.get('printmissed', 'output-format')
    else:
      summ_file_type = 'xml'
    summary_filename = '.'.join([
      '-'.join([ ''.join(on_instruments.split(',')), type + '_SUMMARY', experiment_start, experiment_duration ]),
      summ_file_type ])

    # set node options
    printmissed_node = inspiral.PrintMissedNode( printmissed_job )
    printmissed_node.set_category('printmissed')
    printmissed_node.set_tmp_space( tmp_space )
    printmissed_node.set_input( input_database )
    printmissed_node.set_output( summary_filename )
    printmissed_node.set_include_only_coincs( '[ALLin' + on_instruments + ']' )
    printmissed_node.set_sim_tag( sim_tag )
    printmissed_node.set_output_format( summ_file_type )
  
    dag.add_node( printmissed_node )

    # create the minifollowups nodes
    prefix = '-'.join([ ''.join(on_instruments.split(',')), '_'.join([sim_tag, veto_tag]) ])
    suffix = re.sub(prefix + '_', '', summary_filename).rstrip('.xml')

    minifup_node = inspiral.MiniFollowupsNode( minifup_cm_job )
    minifup_node.set_category('minifollowups')
    minifup_node.set_cache_file( ihope_cachefile )
    if sim_tag == 'ALLINJ':
        minifup_node.set_cache_string( 'INJ' )
    else:
        minifup_node.set_cache_string( sim_tag )
    minifup_node.set_prefix( prefix )
    minifup_node.set_suffix( suffix )
    minifup_node.set_input_xml( summary_filename )
    minifup_node.set_output_html_table( re.sub('.xml', '.html', summary_filename ) )
    minifup_node.set_table_name( 'close_missed_injections' )
  
    minifup_node.add_parent( printmissed_node )
    dag.add_node( minifup_node )

  # create the plotfm node for each plotfm job
  # only do this once per sim_tag as a plotfm job covers all instrument sets
  for plotfm_tag, plotfm_job in plotfm_jobs:
    plotfm_node = inspiral.PlotFMNode( plotfm_job )
    plotfm_node.set_category('plotfm')
    plotfm_node.set_tmp_space( tmp_space )
    plotfm_node.add_file_arg( input_database )
    plotfm_node.set_sim_tag( sim_tag )
    plotfm_node.set_user_tag( '_'.join([ plotfm_tag, sim_tag, veto_tag ]) )
          
    dag.add_node( plotfm_node )


############################################################################
# Plotting: Generate all result plots
  
print "\tsetting up plotting jobs..."

# Write plotslides node
print "\t\tcreating plotslides node..."
plotslides_node = inspiral.PlotSlidesNode( plotslides_job )

plotslides_node.set_category('plotslides')
plotslides_node.set_tmp_space( tmp_space )
plotslides_node.set_input( input_database )
plotslides_node.set_user_tag( type_prefix )
dag.add_node( plotslides_node )

# create plotcumhist node
print "\t\tcreating plotcumhist node..."
plotcumhist_node = inspiral.PlotCumhistNode( plotcumhist_job )
plotcumhist_node.set_tmp_space( tmp_space )
plotcumhist_node.set_input( input_database )
plotcumhist_node.set_user_tag( type_prefix )
dag.add_node( plotcumhist_node )

# Write plotifar nodes for different datatypes
print "\t\tcreating plotifar node for datatypes:"
for datatype in ['all_data', 'playground']: 
  print "\t\t\t%s..." % datatype
  plotifar_node = inspiral.PlotIfarNode( plotifar_job )
  plotifar_node.set_category('plotifar')
  plotifar_node.set_tmp_space( tmp_space )
  plotifar_node.set_input( input_database )
  plotifar_node.set_datatype( datatype )
  plotifar_node.set_user_tag( type_prefix )
    
  dag.add_node( plotifar_node )
 
  
##############################################################################
##############################################################################
# Final Step: Write the DAG

print "Writing DAG and sub files..."

# set max-jobs: currently, only minifollowups is set
dag.add_maxjobs_category('minifollowups', 15)

dag.write_sub_files()
dag.write_script()
dag.write_dag()

# write process end time to log file and write log file
process.set_process_end_time(proc_id)
utils.write_filename(logdoc, basename+'.log.xml', xsl_file = "ligolw.xsl")

print "Finished!"
print "Now run:\n\tcondor_submit_dag %s" % os.path.basename(dag.get_dag_file())

sys.exit(0)

back to top