Revision bb57a39edbb8e0a66ac0d9b514eaa41afaa2a5ca authored by Ian Harry on 17 June 2019, 14:58:05 UTC, committed by GitHub on 17 June 2019, 14:58:05 UTC
1 parent 6e86b2c
Raw File
pycbc_live
#!/usr/bin/env python

"""Detect compact-binary-merger gravitational-wave signals in low latency. This
program takes a fixed template bank and performs matched filtering on a
continuous stream of strain data in fixed short blocks of time. It is capable
of reading from low latency data directories on the LDG clusters. Work is
parallelized through the use of MPI, so this script should typically be
launched with mpirun.

See https://arxiv.org/abs/1805.11174 for an overview."""

import sys
import argparse, numpy, pycbc, logging, cProfile, h5py, lal, json
from time import time
import os.path
import itertools
import platform
from mpi4py import MPI as mpi
from pycbc.pool import BroadcastPool
from pycbc import fft, version, waveform, scheme, makedir
from pycbc.types import MultiDetOptionAction
from pycbc.filter import LiveBatchMatchedFilter, compute_followup_snr_series
from pycbc.filter import followup_event_significance
from pycbc.strain import StrainBuffer
from pycbc.events.ranking import newsnr
from pycbc.events.coinc import LiveCoincTimeslideBackgroundEstimator as Coincer
from pycbc.events.single import LiveSingleFarThreshold
from pycbc.io.live import SingleCoincForGraceDB
import pycbc.waveform.bank
from pycbc.vetoes.sgchisq import SingleDetSGChisq


def ptof(p, ft):
    """Convert p-value to FAR via foreground time `ft`.
    """
    return numpy.log1p(-p) / -ft

def ftop(f, ft):
    """Convert FAR to p-value via foreground time `ft`.
    """
    return 1 - numpy.exp(-ft * f)

def combine_ifar_pvalue(ifar, pvalue, livetime):
    """Convert original IFAR to p-value and combine with followup p-value.
    """
    from scipy.stats import combine_pvalues
    # NB units of ifar and livetime must be the same
    _, pnew = combine_pvalues([ftop(1. / ifar, livetime), pvalue])
    nifar = 1. / ptof(pnew, livetime)
    # take max of original IFAR and combined IFAR & apply trials factor
    return numpy.maximum(ifar, nifar) / 2.

class LiveEventManager(object):
    def __init__(self, output_path,
                       use_date_prefix=False,
                       ifar_upload_threshold=None,
                       pval_livetime=None,
                       enable_gracedb_upload=False,
                       gracedb_testing=True,
                 ):
        self.path = output_path

        # Figure out what we are supposed to process within the pool of MPI processes
        self.comm = mpi.COMM_WORLD
        self.size = self.comm.Get_size()
        self.rank = self.comm.Get_rank()

        self.use_date_prefix = use_date_prefix
        self.ifar_upload_threshold = ifar_upload_threshold
        self.pvalue_livetime = pval_livetime
        self.gracedb_testing = gracedb_testing
        self.enable_gracedb_upload = enable_gracedb_upload

    def commit_results(self, results):
        self.comm.gather(results, root=0)

    def barrier(self):
        self.comm.Barrier()

    def barrier_status(self, status):
        return self.comm.allreduce(status, op=mpi.LAND)

    def gather_results(self):
        """ Collect results from the mpi subprocesses and collate them into
        contiguous sets of arrays.
        """

        if self.rank == 0:
            all_results = self.comm.gather(None, root=0)
            data_ends = [a[1] for a in all_results if a is not None]
            results = [a[0] for a in all_results if a is not None]

            combined = {}
            for ifo in results[0]:

                # check if any of the results returned invalid
                try:
                    for r in results:
                        if r[ifo] is False:
                            raise ValueError
                except ValueError:
                    continue
                combined[ifo] = {}
                for key in results[0][ifo]:
                    combined[ifo][key] = numpy.concatenate([r[ifo][key] for r in results])

            return combined, data_ends[0]
        else:
            raise RuntimeError("Not root process")

    def compute_followup_data(self, ifos, triggers, data_readers, bank,
                              followup_ifos=None):
        """Figure out which of the followup detectors are usable, and compute
        SNR time series for all the available detectors.
        """
        out = {}
        followup_ifos = [] if followup_ifos is None else followup_ifos

        template_id = triggers['foreground/' + ifos[0] + '/template_id']
        htilde = bank[template_id]

        coinc_times = {ifo: triggers['foreground/' + ifo + '/end_time'] for ifo in ifos}

        # Get the SNR series for the ifos that made the initial coinc
        for ifo in ifos:
            # NOTE we only check the state/DQ of followup IFOs here.
            # IFOs producing the coincidence are assumed to also
            # produce valid SNR series.
            snr_series = compute_followup_snr_series(
                    data_readers[ifo], htilde, coinc_times[ifo],
                    check_state=False)

            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}

        # Determine if the other ifos can contribute to the coincident event
        for ifo in followup_ifos:
            snr_series, ptime, pvalue, sigma2 = followup_event_significance(
                    ifo, data_readers[ifo], bank, template_id, coinc_times)
            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}
                self.combine_far_with_followup(ifos[0], ifo, triggers,
                                               snr_series, ptime, pvalue,
                                               sigma2)

        return out

    def combine_far_with_followup(self, coinc_ifo, ifo, triggers, snr_series,
                                  ptime, pvalue, sigma2):
        # Calculate new ifar
        triggers['foreground/ifar'] = combine_ifar_pvalue(triggers['foreground/ifar'],
                                                          pvalue, self.pvalue_livetime)

        # Copy the common fields from the other detector
        # ignore fields that contain detector-specific data
        fields_to_ignore = set(['end_time', 'snr', 'stat', 'coa_phase',
                                'chisq', 'chisq_dof', 'sg_chisq', 'sigmasq'])
        for key in set(triggers):
            if 'foreground/{}/'.format(coinc_ifo) in key:
                _, _, name = key.split('/')
                if name in fields_to_ignore:
                    continue
                triggers['foreground/{}/{}'.format(ifo, name)] = triggers[key]

        # Set the detector-specific fields for which we have data
        snr_series_peak = snr_series.at_time(ptime, nearest_sample=True)
        base = 'foreground/{}/'.format(ifo)
        triggers[base + 'end_time'] = float(ptime)
        triggers[base + 'snr'] = triggers[base + 'stat'] = abs(snr_series_peak)
        triggers[base + 'coa_phase'] = numpy.angle(snr_series_peak)
        triggers[base + 'sigmasq'] = sigma2

    def check_coincs(self, ifos, coinc_results, psds, f_low,
                     data_readers, bank):
        """ Perform any followup and save zerolag triggers to a coinc xml file
        """
        #check for hardware injection
        for ifo in ifos:
            if data_reader[ifo].near_hwinj():
                coinc_results['HWINJ'] = True
                break    

        if 'foreground/ifar' in coinc_results:
            logging.info('computing followup data for coinc')

            coinc_ifos = coinc_results['foreground/type'].split('-')
            followup_ifos = list(set(ifos) - set(coinc_ifos))

            double_ifar = coinc_results['foreground/ifar']
            if double_ifar < args.ifar_double_followup_threshold:
                coinc_results['foreground/NO_FOLLOWUP'] = True
                return

            fud = self.compute_followup_data(coinc_ifos, coinc_results,
                                             data_readers, bank, followup_ifos)

            live_ifos = [ifo for ifo in fud if 'snr_series' in fud[ifo]]

            # FIXME delta_t can vary due to rounding errors, force them to be identical
            fix_delta_t = fud[coinc_ifos[0]]['snr_series'].delta_t
            for ifo in live_ifos:
                fud[ifo]['snr_series']._delta_t = fix_delta_t

            event = SingleCoincForGraceDB(live_ifos, coinc_results, bank=bank,
                                          psds=psds, followup_data=fud,
                                          low_frequency_cutoff=f_low,
                                          channel_names=args.channel_name)

            end_time = int(coinc_results['foreground/%s/end_time' % coinc_ifos[0]])
            fname = os.path.join(self.path, 'coinc-%s.xml.gz' % end_time)
            logging.info('Coincident candidate! Saving as %s', fname)
            comments = ['using ranking statistic: %s' % args.background_statistic]

            ifar = coinc_results['foreground/ifar']
            if self.enable_gracedb_upload and self.ifar_upload_threshold < ifar:
                event.upload(fname, gracedb_server=args.gracedb_server,
                             testing=self.gracedb_testing,
                             extra_strings=comments)
            else:
                event.save(fname)

    def check_singles(self, results, data_reader, psds, f_low):
        active = [k for k in results if results[k] != None]
        if len(active) == 1:
            ifo = active[0]
            single = sngl_estimator[ifo].check(results[ifo], data_reader[ifo])

            if single is not None:
                fud = self.compute_followup_data([ifo], single, data_reader,
                                                 bank, [])
                event = SingleCoincForGraceDB([ifo], single, bank=bank,
                                              psds=psds, followup_data=fud,
                                              low_frequency_cutoff=f_low,
                                              channel_names=args.channel_name)

                end_time = int(single['foreground/%s/end_time' % ifo])
                fname = 'single-%s-%s.xml.gz' % (ifo, end_time)
                fname = os.path.join(self.path, fname)
                logging.info('Single-detector candidate! Saving as %s', fname)
                if args.enable_single_detector_upload:
                    event.upload(fname, gracedb_server=args.gracedb_server,
                                 testing=self.gracedb_testing)
                else:
                    event.save(fname)

    def dump(self, results, name, store_psd=False, time_index=None,
             store_loudest_index=False, raw_results=None):
        """ Save the results from this time block to an hdf output file """
        if self.use_date_prefix:
            tm = lal.GPSToUTC(int(time_index))
            subdir = '{:04d}_{:02d}_{:02d}'.format(tm[0], tm[1], tm[2])
            makedir(os.path.join(self.path, subdir))
            fname = os.path.join(self.path, subdir, name) + '.hdf'
        else:
            makedir(self.path)
            fname = os.path.join(self.path, name) + '.hdf'

        with h5py.File(fname, 'w') as f:
            f.attrs['pycbc_version'] = version.git_verbose_msg
            f.attrs['command_line'] = sys.argv
            f.attrs['num_live_detectors'] = len(self.live_detectors)

            for ifo in results:
                for k in results[ifo]:
                    f['%s/%s' % (ifo, k)] = results[ifo][k]

            for key in raw_results:
                f[key] = raw_results[key]

            if store_loudest_index:
                for ifo in results:
                    if 'snr' in results[ifo]:
                        s = numpy.array(results[ifo]['snr'], ndmin=1)
                        c = numpy.array(results[ifo]['chisq'], ndmin=1)
                        nsnr = numpy.array(newsnr(s, c), ndmin=1) if len(s) > 0 else []

                        # loudest by newsnr
                        nloudest = numpy.argsort(nsnr)[::-1][0:store_loudest_index]

                        # loudest by snr
                        sloudest = numpy.argsort(s)[::-1][0:store_loudest_index]
                        f[ifo]['loudest'] = numpy.union1d(nloudest, sloudest)

        if store_psd:
            for ifo in store_psd:
                if store_psd[ifo] is not None:
                    store_psd[ifo].save(fname, group='%s/psd' % ifo)


parser = argparse.ArgumentParser(description=__doc__)
pycbc.waveform.bank.add_approximant_arg(parser)
parser.add_argument('--verbose', action='store_true')
parser.add_argument('--version', action='version', version=version.git_verbose_msg)
parser.add_argument('--bank-file', required=True,
                    help="Template bank file in XML or HDF format")
parser.add_argument('--low-frequency-cutoff', help="low frequency cutoff", type=int)
parser.add_argument('--sample-rate', help="output sample rate", type=int)
parser.add_argument('--chisq-bins', help="Number of chisq bins")
parser.add_argument('--analysis-chunk', type=int, required=True,
                        help="Amount of data to produce triggers in a block")

parser.add_argument('--snr-threshold', type=float,
                    help='SNR threshold for generating a trigger')
parser.add_argument('--snr-abort-threshold', type=float)

parser.add_argument('--channel-name', action=MultiDetOptionAction, nargs='+',
                    required=True)
parser.add_argument('--state-channel', action=MultiDetOptionAction, nargs='+',
                    help="Channel containing frame status information. Used "
                         "to determine when to analyze the hoft data. This somewhat "
                         "corresponds to CAT1 information")
parser.add_argument('--analyze-flags', action=MultiDetOptionAction, nargs='+',
                    help='The flags that must be in the "good" state to analyze data')
parser.add_argument('--data-quality-channel', action=MultiDetOptionAction, nargs='+',
                    help="Channel containing data quality information. Used "
                         "to determine when hoft may be suspect and may be used to veto"
                         "triggers or not analyze a segment of data. This roughly "
                         "corresponds to CAT2 information")
parser.add_argument('--data-quality-flags', action=MultiDetOptionAction, nargs='+',
                    help='Flags used to determine when to throw triggers away. '
                         'For each detector, give a comma-separated list of flags.')
parser.add_argument('--data-quality-padding', type=float, default=0,
                    help='Time in seconds around a bad dq time to additionally remove triggers')
parser.add_argument('--frame-src', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-type', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--force-update-cache', action='store_true')
parser.add_argument('--highpass-frequency', type=float,
                    help="Frequency to apply highpass filtering")
parser.add_argument('--highpass-reduction', type=float,
                    help="DB to reduce low frequencies")
parser.add_argument('--highpass-bandwidth', type=float,
                    help="Width of the highpass turnover region in Hz")
parser.add_argument('--psd-recalculate-difference', type=float, default=.01)
parser.add_argument('--psd-abort-difference', type=float, default=.20)
parser.add_argument('--psd-samples', type=int, required=True,
                    help="Number of PSD segments to use in the rolling estimate")
parser.add_argument('--psd-segment-length', type=int, required=True,
                    help="Length in seconds of each PSD segment")
parser.add_argument('--psd-inverse-length', type=float,
                    help="Length in time for the equivalent FIR filter")
parser.add_argument('--trim-padding', type=float, default=0.25,
                    help="Padding around the overwhitened analysis block")
parser.add_argument("--enable-bank-start-frequency", action='store_true',
                    help="Read the starting frequency of template waveforms"
                         " from the template bank")
parser.add_argument('--autogating-threshold', type=float, default=100)
parser.add_argument('--autogating-pad', type=float, default=.25)
parser.add_argument('--autogating-window', type=float, default=0.5)
parser.add_argument('--autogating-cluster', type=float, default=.25)

parser.add_argument('--sync', action='store_true')
parser.add_argument('--increment-update-cache', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-read-timeout', type=float, default=30)
parser.add_argument('--increment', type=int, default=8)

parser.add_argument('--start-time', type=int, default=None,
                    help='Start the analysis at the given GPS time')
parser.add_argument('--end-time', type=int, default=numpy.inf,
                    help='Stop the analysis at the given GPS time')

parser.add_argument('--output-path', required=True,
                    help='Path to a directory to store results in')
parser.add_argument('--output-status', type=str, metavar='PATH',
                    help='If given, PyCBC Live will periodically write JSON '
                         'status info to PATH, so the analysis can be '
                         'monitored via Nagios')
parser.add_argument('--day-hour-output-prefix', action='store_true')
parser.add_argument('--store-psd', action='store_true')
parser.add_argument('--output-background', type=str, nargs='+',
                    help='Takes a period in seconds and a file path and dumps '
                         'the coinc backgrounds to that path with that period')

parser.add_argument('--newsnr-threshold', type=float, default=0)
parser.add_argument('--max-batch-size', type=int, default=2**27)
parser.add_argument('--store-loudest-index', type=int, default=0)
parser.add_argument('--max-psd-abort-distance', type=float, default=numpy.inf)
parser.add_argument('--min-psd-abort-distance', type=float, default=-numpy.inf)
parser.add_argument('--max-triggers-in-batch', type=int)
parser.add_argument('--max-length', type=float,
                    help='Maximum duration of templates, used to set the data buffer size')

parser.add_argument('--enable-profiling', type=int, metavar='RANK',
                    help='Dump out profiling information from the MPI process'
                         ' with given rank at the end of program execution')

parser.add_argument('--enable-background-estimation', default=False, action='store_true')
parser.add_argument('--ifar-upload-threshold', type=float, required=True,
                    help='Inverse-FAR threshold for uploading coincident '
                         'triggers to GraceDB, in years.')
parser.add_argument('--ifar-double-followup-threshold', type=float, required=True,
                    help='Inverse-FAR threshold to followup double coincs with'
                         'additional detectors')
parser.add_argument('--enable-gracedb-upload', action='store_true', default=False,
                    help='Upload triggers to GraceDB')
parser.add_argument('--file-prefix', default='Live')
parser.add_argument('--enable-production-gracedb-upload', action='store_true', default=False,
                    help='Do not mark triggers uploaded to GraceDB as test '
                         'events. This option should *only* be enabled in '
                         'production analyses!')
parser.add_argument('--enable-single-detector-upload', action='store_true', default=False)
parser.add_argument('--pvalue-combination-livetime', type=float, required=True,
                    help="Livetime used for p-value combination with followup "
                         "detectors, in years")
parser.add_argument('--enable-single-detector-background', action='store_true', default=False)
parser.add_argument('--round-start-time', type=int, metavar='X',
                    help="Round up the start time to the nearest multiple of X"
                         " seconds. This is useful for forcing agreement "
                         " with frame file production.")
parser.add_argument('--gracedb-server', metavar='URL',
                    help='URL of GraceDB server API for uploading events. '
                         'If not provided, the default URL is used.')
parser.add_argument('--size-override', type=int, metavar='N',
                    help="Override the internal MPI size layout. "
                         " Useful for debugging and running a portion of a bank")
parser.add_argument('--fftw-planning-limit', type=float,
                    help="Time in seconds to allow for a plan to be created")

scheme.insert_processing_option_group(parser)
LiveSingleFarThreshold.insert_args(parser)
fft.insert_fft_option_group(parser)
Coincer.insert_args(parser)
SingleDetSGChisq.insert_option_group(parser)
args = parser.parse_args()
scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)

if args.output_background is not None and len(args.output_background) != 2:
    parser.error('--output-background takes two parameters: period and path')

log_format = "%(asctime)s {0} %(message)s".format(platform.node())
pycbc.init_logging(args.verbose, format=log_format)

ctx = scheme.from_cli(args)
fft.from_cli(args)

sr = args.sample_rate
flow = args.low_frequency_cutoff

# Approximant guess of the total padding
valid_pad = args.analysis_chunk
total_pad = args.trim_padding * 2 + valid_pad
bank = waveform.LiveFilterBank(args.bank_file, sr, total_pad,
                       low_frequency_cutoff=None if args.enable_bank_start_frequency else flow,
                       approximant=args.approximant,
                       increment=args.increment)

evnt = LiveEventManager(args.output_path,
                        use_date_prefix=args.day_hour_output_prefix,
                        ifar_upload_threshold=args.ifar_upload_threshold,
                        pval_livetime=args.pvalue_combination_livetime,
                        enable_gracedb_upload=args.enable_gracedb_upload,
                        gracedb_testing=not args.enable_production_gracedb_upload,
                       )

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

if args.size_override:
    evnt.size=args.size_override

# ifos used for primary analysis (only two at the moment)
ifos = set(args.channel_name.keys())
logging.info('Running with %s', ', '.join(sorted(ifos)))

# make sure we can talk to GraceDB
if evnt.rank == 0 and args.enable_gracedb_upload:
    logging.info('Testing access to GraceDB')
    from ligo.gracedb.rest import GraceDb
    gdb_client = GraceDb(args.gracedb_server) if args.gracedb_server else GraceDb()
    gdb_client.ping()
    del gdb_client

# I'm not the root, so do some actual filtering.
with ctx:
    try:
        # Import system wisdom.
        if args.fftw_import_system_wisdom:
            fft.fftw.import_sys_wisdom()

        # Read specified user-provided wisdom files
        if args.fftw_input_float_wisdom_file is not None:
            fft.fftw.import_single_wisdom_from_filename(args.fftw_input_float_wisdom_file)

        if args.fftw_input_double_wisdom_file is not None:
            fft.fftw.import_double_wisdom_from_filename(args.fftw_input_double_wisdom_file)

        if args.fftw_planning_limit:
            fft.fftw.set_planning_limit(args.fftw_planning_limit)
    except (ValueError, RuntimeError) as e:
        print(e)
        exit()

    maxlen = args.psd_segment_length * (args.psd_samples // 2 + 1)
    if evnt.rank > 0:
        bank.table.sort(order='mchirp')
        waveforms = list(bank[evnt.rank-1::evnt.size-1])
        lengths = numpy.array([1.0 / wf.delta_f for wf in waveforms])
        psd_len = args.psd_segment_length * (args.psd_samples // 2 + 1)
        maxlen = max(lengths.max(), psd_len)
        mf = LiveBatchMatchedFilter(waveforms, args.snr_threshold,
                                    args.chisq_bins, sg_chisq,
                                    snr_abort_threshold=args.snr_abort_threshold,
                                    newsnr_threshold=args.newsnr_threshold,
                                    max_triggers_in_batch=args.max_triggers_in_batch,
                                    maxelements=args.max_batch_size)

    # Synchronize start time if not provided on the command line
    if not args.start_time:
        evnt.barrier()
        tnow = lal.GPSTimeNow() if evnt.rank == 0 else None
        args.start_time = evnt.comm.bcast(tnow, root=0)

    if args.round_start_time:
        args.start_time = int(args.start_time / args.round_start_time + 1) * args.round_start_time
        logging.info('Starting from: %s', args.start_time)

    # initialize the data readers for all detectors
    if args.max_length is not None:
        maxlen = args.max_length
    maxlen = int(maxlen)
    data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen)
                   for ifo in ifos}

    # create single-detector background "estimators"
    if args.enable_single_detector_background and evnt.rank == 0:
        sngl_estimator = {ifo: LiveSingleFarThreshold.from_cli(args, ifo)
                          for ifo in ifos}

    # Create double coincident background estimator for every combo
    if args.enable_background_estimation and evnt.rank == 0:
        ifo_combos = itertools.combinations(ifos, 2)
        estimators = []
        for combo in ifo_combos:
            logging.info('Will calculate %s background', combo)
            estimators.append(Coincer.from_cli(args,
                              len(bank), args.analysis_chunk, list(combo)))
                              
        my_coinc_id = 999999
        def set_coinc_id(i):
            global my_coinc_id
            my_coinc_id = i
        
        def get_coinc(results):
            c = estimators[my_coinc_id]
            r = c.add_singles(results)
            logging.info('Coincs %i: %s-%s: %s in cbuffer', my_coinc_id,
                         c.ifos[0], c.ifos[1], c.coincs.index)
            return r

        def output_background(_):
            estim = estimators[my_coinc_id]
            bg_time = estim.background_time / lal.YRJUL_SI
            return estim.ifos, estim.coincs.data, bg_time
            
        coinc_pool = BroadcastPool(len(estimators))
        coinc_pool.allmap(set_coinc_id, range(len(estimators)))

    logging.info('%s: Starting...', evnt.rank)

    if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
        pr = cProfile.Profile()
        pr.enable()

    # main analysis loop
    data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
    last_bg_dump_time = int(data_end())
    while data_end() < args.end_time:
        t1 = time()
        logging.info('%s: Analyzing from %s', evnt.rank, data_end())
        results = {}
        evnt.live_detectors = set()

        for ifo in ifos:
            results[ifo] = False
            status = data_reader[ifo].advance(valid_pad, timeout=args.frame_read_timeout)

            if status is True:
                status = data_reader[ifo].recalculate_psd()

            if data_reader[ifo].psd is not None:
                dist = data_reader[ifo].psd.dist
                if dist < args.min_psd_abort_distance or dist > args.max_psd_abort_distance:
                    logging.info("%s PSD dist %s outside acceptable range [%s, %s]",
                                 ifo, dist, args.min_psd_abort_distance,
                                 args.max_psd_abort_distance)
                    status = False

            if status is True:
                evnt.live_detectors.add(ifo)
                if evnt.rank > 0:
                    logging.info('%s: Filtering %s', evnt.rank, ifo)
                    results[ifo] = mf.process_data(data_reader[ifo])
            else:
                logging.info('Insufficient data for %s analysis', ifo)

        if evnt.rank > 0:
            evnt.commit_results((results, data_end()))
        else:
            psds = {ifo: data_reader[ifo].psd for ifo in data_reader if data_reader[ifo].psd is not None}

            # Collect together the single detector triggers
            if evnt.size > 1:
                results, valid_end = evnt.gather_results()

            # protect from situations where master and worker nodes disagree
            # on the status of detectors: the master process has the last word
            for ifo in set(results.keys()) - evnt.live_detectors:
                results.pop(ifo)

            # Veto single detector triggers that fail the DQ vector
            for ifo in results:
                if data_reader[ifo].dq is None:
                    continue
                logging.info("Checking %s's DQ vector", ifo)
                start = data_reader[ifo].start_time
                times = results[ifo]['end_time']
                idx = data_reader[ifo].dq.indices_of_flag(
                        start, valid_pad, times,
                        padding=data_reader[ifo].dq_padding)
                logging.info('Keeping %d/%d %s triggers',
                             len(idx), len(times), ifo)
                for key in results[ifo]:
                    if len(results[ifo][key]):
                        results[ifo][key] = results[ifo][key][idx]

            # Look for coincident triggers and do background estimation
            if args.enable_background_estimation:
                coinc_results = coinc_pool.broadcast(get_coinc, results)
                
                # Pick the best coinc in this chunk
                best_coinc = Coincer.pick_best_coinc(coinc_results)

                evnt.check_coincs(results.keys(), best_coinc,
                                  psds, args.low_frequency_cutoff,
                                  data_reader, bank)

            # Check for singles if we don't have coinc time
            if args.enable_single_detector_background:
                evnt.check_singles(results, data_reader, psds,
                                   args.low_frequency_cutoff)

            # map the results file to an hdf file
            prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)),
                                          args.file_prefix,
                                          data_end() - args.analysis_chunk,
                                          valid_pad)

            evnt.dump(results, prefix, time_index=data_end(),
                      store_psd=(psds if args.store_psd else False),
                      store_loudest_index=args.store_loudest_index,
                      raw_results=best_coinc)

            # dump the background if needed
            if args.output_background and \
                    data_end() - last_bg_dump_time > float(args.output_background[0]):
                last_bg_dump_time = int(data_end())
                bg_dists = coinc_pool.broadcast(output_background, None)
                bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)),
                                                           last_bg_dump_time)
                bg_fn = os.path.join(args.output_background[1], bg_fn)
                with h5py.File(bg_fn, 'w') as bgf:
                    for bg_ifos, bg_data, bg_time in bg_dists:
                        ds = bgf.create_dataset(','.join(sorted(bg_ifos)),
                                                data=bg_data, compression='gzip')
                        ds.attrs['background_time'] = bg_time
                    bgf.attrs['gps_time'] = last_bg_dump_time

            logging.info('Finished Analyzing up to %s', data_end())

        if args.sync:
            evnt.barrier()
        tdiff = time() - t1
        lag = float(lal.GPSTimeNow() - data_end())
        logging.info('%s: Took %1.2f, duty factor of %.2f, '
                     'lag %.2f s, %d live detectors',
                     evnt.rank, tdiff, tdiff / valid_pad, lag,
                     len(evnt.live_detectors))

        if args.output_status is not None and evnt.rank == 0:
            if lag > 120:
                status_intervals = [{'num_status': 2,
                                     'txt_status': 'CRITICAL: lag greater than 2 min',
                                     'start_sec': 0}]
            else:
                status_intervals = [{'num_status': 0,
                                     'txt_status': 'OK: No reported problems',
                                     'start_sec': 0},
                                    {'num_status': 1,
                                     'txt_status': 'WARNING: last report between 2 and 4 min ago',
                                     'start_sec': 120}]
            status_intervals.append({'num_status': 2,
                                     'txt_status': 'CRITICAL: last report more than 4 min ago',
                                     'start_sec': 240})
            status = {'author': 'Tito Dal Canton',
                      'email': 'tito.canton@ligo.org',
                      'created_gps': int(lal.GPSTimeNow()),
                      'status_intervals': status_intervals}
            try:
                with open(args.output_status, 'w') as status_fp:
                    json.dump(status, status_fp)
            except IOError:
                logging.error('I/O error writing status JSON file! '
                              'Hopefully it works next time')

if evnt.rank == 1:
    if args.fftw_output_float_wisdom_file:
        fft.fftw.export_single_wisdom_to_filename(args.fftw_output_float_wisdom_file)

    if args.fftw_output_double_wisdom_file:
        fft.fftw.export_double_wisdom_to_filename(args.fftw_output_double_wisdom_file)

if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
    pr.dump_stats('profiling_rank_{:03d}'.format(evnt.rank))
back to top