swh:1:snp:3a699297f000109a1bc833f294a54171df990207
Raw File
Tip revision: ce8294c4fc6212dee6c870ab92f43fb2846c3a82 authored by Josh Willis on 20 August 2020, 19:48:57 UTC
Prepare for release (#3429)
Tip revision: ce8294c
pycbc_live
#!/usr/bin/env python

"""Detect compact-binary-merger gravitational-wave signals in low latency. This
program takes a fixed template bank and performs matched filtering on a
continuous stream of strain data in fixed short blocks of time. It is capable
of reading from low latency data directories on the LDG clusters. Work is
parallelized through the use of MPI, so this script should typically be
launched with mpirun.

See https://arxiv.org/abs/1805.11174 for an overview."""

import sys
import argparse, numpy, pycbc, logging, cProfile, h5py, lal, json
from time import time
import os.path
import itertools
import platform
import distutils
import subprocess
from mpi4py import MPI as mpi
from pycbc.pool import BroadcastPool
from pycbc import fft, version, waveform, scheme, makedir
from pycbc.types import MultiDetOptionAction
from pycbc.filter import LiveBatchMatchedFilter, compute_followup_snr_series
from pycbc.filter import followup_event_significance
from pycbc.strain import StrainBuffer
from pycbc.events.ranking import newsnr
from pycbc.events.coinc import LiveCoincTimeslideBackgroundEstimator as Coincer
from pycbc.events.single import LiveSingle
from pycbc.io.live import SingleCoincForGraceDB
from pycbc.io.hdf import recursively_save_dict_contents_to_group
import pycbc.waveform.bank
from pycbc.vetoes.sgchisq import SingleDetSGChisq
from pycbc.waveform.waveform import props
from pycbc import mchirp_area


def ppdets(ifos):
    """Pretty-print a list (or set) of detectors: return a string listing
    the given detectors alphabetically and separated by commas.
    """
    if ifos:
        return ', '.join(sorted(ifos))
    return 'no detectors'

def ptof(p, ft):
    """Convert p-value to FAR via foreground time `ft`.
    """
    return numpy.log1p(-p) / -ft

def ftop(f, ft):
    """Convert FAR to p-value via foreground time `ft`.
    """
    return 1 - numpy.exp(-ft * f)

def combine_ifar_pvalue(ifar, pvalue, livetime):
    """Convert original IFAR to p-value and combine with followup p-value.
    """
    from scipy.stats import combine_pvalues
    # NB units of ifar and livetime must be the same
    _, pnew = combine_pvalues([ftop(1. / ifar, livetime), pvalue])
    nifar = 1. / ptof(pnew, livetime)
    # take max of original IFAR and combined IFAR & apply trials factor
    return numpy.maximum(ifar, nifar) / 2.


class LiveEventManager(object):
    def __init__(self, output_path, mc_area_args,
                 use_date_prefix=False,
                 ifar_upload_threshold=None,
                 pval_livetime=None,
                 enable_gracedb_upload=False,
                 gracedb_testing=True,
                 run_snr_optimization=False):
        self.path = output_path
        self.mc_area_args = mc_area_args

        # Figure out what we are supposed to process within the pool of MPI processes
        self.comm = mpi.COMM_WORLD
        self.size = self.comm.Get_size()
        self.rank = self.comm.Get_rank()

        self.use_date_prefix = use_date_prefix
        self.ifar_upload_threshold = ifar_upload_threshold
        self.pvalue_livetime = pval_livetime
        self.gracedb_testing = gracedb_testing
        self.enable_gracedb_upload = enable_gracedb_upload

        self.run_snr_optimization = run_snr_optimization

        if self.rank == 0:
            # preestimate the number of CPU cores that we can afford giving to
            # followup processes without slowing down the main search
            available_cores = len(os.sched_getaffinity(0))
            bg_cores = len(tuple(itertools.combinations(ifos, 2)))
            analysis_cores = 1 + bg_cores
            self.fu_cores = available_cores - analysis_cores
            if self.fu_cores <= 0:
                logging.warning('Insufficient number of CPU cores (%d) to '
                                'run search and trigger followups. Uploaded '
                                'triggers will momentarily increase the lag',
                                available_cores)
                self.fu_cores = 1

    def commit_results(self, results):
        self.comm.gather(results, root=0)

    def barrier(self):
        self.comm.Barrier()

    def barrier_status(self, status):
        return self.comm.allreduce(status, op=mpi.LAND)

    def gather_results(self):
        """ Collect results from the mpi subprocesses and collate them into
        contiguous sets of arrays.
        """

        if self.rank == 0:
            all_results = self.comm.gather(None, root=0)
            data_ends = [a[1] for a in all_results if a is not None]
            results = [a[0] for a in all_results if a is not None]

            combined = {}
            for ifo in results[0]:

                # check if any of the results returned invalid
                try:
                    for r in results:
                        if r[ifo] is False:
                            raise ValueError
                except ValueError:
                    continue
                combined[ifo] = {}
                for key in results[0][ifo]:
                    combined[ifo][key] = numpy.concatenate([r[ifo][key] for r in results])

            return combined, data_ends[0]
        else:
            raise RuntimeError("Not root process")

    def compute_followup_data(self, ifos, triggers, data_readers, bank,
                              followup_ifos=None, recalculate_ifar=False):
        """Figure out which of the followup detectors are usable, and compute
        SNR time series for all the available detectors.
        """
        out = {}
        followup_ifos = [] if followup_ifos is None else followup_ifos

        template_id = triggers['foreground/' + ifos[0] + '/template_id']
        htilde = bank[template_id]

        coinc_times = {ifo: triggers['foreground/' + ifo + '/end_time'] for ifo in ifos}

        # Get the SNR series for the ifos that made the initial coinc
        for ifo in ifos:
            # NOTE we only check the state/DQ of followup IFOs here.
            # IFOs producing the coincidence are assumed to also
            # produce valid SNR series.
            snr_series = compute_followup_snr_series(
                    data_readers[ifo], htilde, coinc_times[ifo],
                    check_state=False)

            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}

        # Determine if the other ifos can contribute to the coincident event
        for ifo in followup_ifos:
            snr_series, ptime, pvalue, sigma2 = followup_event_significance(
                    ifo, data_readers[ifo], bank, template_id, coinc_times)
            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}
                self.get_followup_info(ifos[0], ifo, triggers, snr_series,
                                       ptime, pvalue, sigma2,
                                       recalculate_ifar=recalculate_ifar)

        # the SNR time series sample rate can vary slightly due to
        # rounding errors, so force all of them to be identical
        fix_delta_t = None
        for ifo in out:
            if 'snr_series' not in out[ifo]:
                continue
            if fix_delta_t is None:
                fix_delta_t = out[ifo]['snr_series']._delta_t
            else:
                out[ifo]['snr_series']._delta_t = fix_delta_t

        return out

    def get_followup_info(self, coinc_ifo, ifo, triggers, snr_series, ptime,
                          pvalue, sigma2, recalculate_ifar=False):
        # Copy the common fields from the other detector
        # ignore fields that contain detector-specific data
        fields_to_ignore = set(['end_time', 'snr', 'stat', 'coa_phase',
                                'chisq', 'chisq_dof', 'sg_chisq', 'sigmasq'])
        for key in set(triggers):
            if 'foreground/{}/'.format(coinc_ifo) in key:
                _, _, name = key.split('/')
                if name in fields_to_ignore:
                    continue
                triggers['foreground/{}/{}'.format(ifo, name)] = triggers[key]

        # Set the detector-specific fields for which we have data
        snr_series_peak = snr_series.at_time(ptime, nearest_sample=True)
        base = 'foreground/{}/'.format(ifo)
        triggers[base + 'end_time'] = float(ptime)
        triggers[base + 'snr'] = triggers[base + 'stat'] = abs(snr_series_peak)
        triggers[base + 'coa_phase'] = numpy.angle(snr_series_peak)
        triggers[base + 'sigmasq'] = sigma2
        if recalculate_ifar:
            # Calculate new ifar
            triggers['foreground/ifar'] = combine_ifar_pvalue(
                    triggers['foreground/ifar'], pvalue, self.pvalue_livetime)
            triggers['foreground/pvalue_{}'.format(ifo)] = pvalue

    def check_coincs(self, ifos, coinc_results, psds, f_low,
                     data_readers, bank):
        """ Perform any followup and save zerolag triggers to a coinc xml file
        """
        #check for hardware injection
        for ifo in ifos:
            if data_readers[ifo].near_hwinj():
                coinc_results['HWINJ'] = True
                break

        if 'foreground/ifar' in coinc_results:
            logging.info('computing followup data for coinc')

            coinc_ifos = coinc_results['foreground/type'].split('-')
            followup_ifos = list(set(ifos) - set(coinc_ifos))

            double_ifar = coinc_results['foreground/ifar']
            if double_ifar < args.ifar_double_followup_threshold:
                coinc_results['foreground/NO_FOLLOWUP'] = True
                return

            fud = self.compute_followup_data(coinc_ifos, coinc_results,
                                             data_readers, bank,
                                             followup_ifos=followup_ifos,
                                             recalculate_ifar=True)

            live_ifos = [ifo for ifo in fud if 'snr_series' in fud[ifo]]

            event = SingleCoincForGraceDB(live_ifos, coinc_results, bank=bank,
                                          psds=psds, followup_data=fud,
                                          low_frequency_cutoff=f_low,
                                          channel_names=args.channel_name,
                                          mc_area_args=self.mc_area_args,
                                          )

            fname = 'coinc-{}-{}.xml.gz'.format(event.merger_time,
                                                pycbc.random_string(6))
            fname = os.path.join(self.path, fname)
            logging.info('Coincident candidate! Saving as %s', fname)

            # verbally explain some details not obvious from the other info
            comment = ('Trigger produced as a {} coincidence. '
                       'FAR is based on all listed detectors.<br />'
                       'Two-detector ranking statistic: {}<br />'
                       'Followup detectors: {}')
            comment = comment.format(ppdets(coinc_ifos),
                                     args.background_statistic,
                                     ppdets(followup_ifos))

            ifar = coinc_results['foreground/ifar']
            if self.enable_gracedb_upload and self.ifar_upload_threshold < ifar:
                gid = event.upload(fname, gracedb_server=args.gracedb_server,
                                   testing=self.gracedb_testing,
                                   extra_strings=[comment],
                                   search=args.gracedb_search)
            else:
                gid = None
                event.save(fname)

            if self.run_snr_optimization and self.ifar_upload_threshold < ifar:
                template_id = \
                    coinc_results['foreground/{}/template_id'.format(live_ifos[0])]
                p = props(bank.table[template_id])
                apr = p.pop('approximant')
                min_buffer = bank.minimum_buffer + 0.5
                buff_size = \
                    pycbc.waveform.get_waveform_filter_length_in_time(apr, **p)

                tlen = bank.round_up((buff_size + min_buffer) \
                    * bank.sample_rate)
                flen = int(tlen / 2 + 1)
                delta_f = bank.sample_rate / float(tlen)
                cmd = 'timeout {} '.format(args.snr_opt_timeout)
                exepath = distutils.spawn.find_executable('pycbc_optimize_snr')
                cmd += exepath + ' '
                data_fils_str = '--data-files '
                psd_fils_str = '--psd-files '
                for ifo in ifos:
                    curr_fname = \
                        fname.replace('.xml.gz',
                                      '_{}_data_overwhitened.hdf'.format(ifo))
                    curr_data = data_readers[ifo].overwhitened_data(delta_f)
                    curr_data.save(curr_fname)
                    data_fils_str += '{}:{} ' .format(ifo, curr_fname)
                    curr_fname = fname.replace('.xml.gz',
                                               '_{}_psd.hdf'.format(ifo))
                    curr_psd = curr_data.psd
                    curr_psd.save(curr_fname)
                    psd_fils_str += '{}:{} ' .format(ifo, curr_fname)
                cmd += data_fils_str
                cmd += psd_fils_str

                curr_fname = fname.replace('.xml.gz', '_attributes.hdf')
                with h5py.File(curr_fname, 'w') as hdfp:
                    for ifo in coinc_ifos:
                        curr_time = \
                            coinc_results['foreground/{}/end_time'.format(ifo)]
                        hdfp['coinc_times/{}'.format(ifo)] = curr_time
                    f_end = bank.end_frequency(template_id)
                    if f_end is None or f_end >= (flen * delta_f):
                        f_end = (flen-1) * delta_f
                    hdfp['flen'] = flen
                    hdfp['delta_f'] = delta_f
                    hdfp['f_end'] = f_end
                    hdfp['sample_rate'] = bank.sample_rate
                    hdfp['flow'] = bank.table[template_id].f_lower
                    hdfp['mass1'] = bank.table[template_id].mass1
                    hdfp['mass2'] = bank.table[template_id].mass2
                    hdfp['spin1z'] = bank.table[template_id].spin1z
                    hdfp['spin2z'] = bank.table[template_id].spin2z
                    hdfp['template_duration'] = \
                        bank.table[template_id].template_duration
                    hdfp['ifar'] = ifar
                    if gid is not None:
                        hdfp['gid'] = gid

                    for ifo in args.channel_name:
                        hdfp['channel_names/{}'.format(ifo)] = \
                            args.channel_name[ifo]

                    recursively_save_dict_contents_to_group(hdfp,
                                                            'mc_area_args/',
                                                            self.mc_area_args)

                cmd += '--params-file {} '.format(curr_fname)
                cmd += '--approximant {} '.format(apr)
                cmd += '--gracedb-server {} '.format(args.gracedb_server)
                cmd += '--gracedb-search {} '.format(args.gracedb_search)
                if not self.gracedb_testing:
                    cmd += '--production '
                cmd += '--verbose '
                cmd += '--output-path {} '.format(self.path)
                if self.enable_gracedb_upload:
                    cmd += '--enable-gracedb-upload '

                cmd += '--cores {}'.format(self.fu_cores)

                log_fname = \
                        fname.replace('.xml.gz', '_optimize_snr_log.dat')

                cmd += '&> {} '.format(log_fname)
                logging.info('Running %s', cmd)
                subprocess.Popen(cmd, shell=True)


    def check_singles(self, results, data_reader, psds, f_low):
        active = [k for k in results if results[k] != None]
        for ifo in active:
            single = sngl_estimator[ifo].check(results[ifo], data_reader[ifo])

            if single is None:
                continue

            followup_ifos = [i for i in active if i is not ifo]
            fud = self.compute_followup_data([ifo], single, data_reader,
                                             bank, followup_ifos=followup_ifos,
                                             recalculate_ifar=False)
            ifar = single['foreground/ifar']
            # apply a trials factor of the number of active detectors
            ifar /= len(active)
            single['foreground/ifar'] = ifar

            live_ifos = [i for i in fud if 'snr_series' in fud[i]]

            event = SingleCoincForGraceDB(live_ifos, single, bank=bank,
                                          psds=psds, followup_data=fud,
                                          low_frequency_cutoff=f_low,
                                          channel_names=args.channel_name,
                                          mc_area_args=self.mc_area_args,
                                          )

            fname = 'single-%s-%s.xml.gz' % (ifo, event.merger_time)
            fname = os.path.join(self.path, fname)
            logging.info('Single-detector candidate! Saving as %s', fname)

            # verbally explain some details not obvious from the other info
            comment = ('Trigger produced as a {0} single. '
                       'FAR is based on {0} only.<br />'
                       'Followup detectors: {1}')
            comment = comment.format(ifo, ppdets(followup_ifos))

            if args.enable_single_detector_upload \
                    and self.ifar_upload_threshold < ifar:
                event.upload(fname, gracedb_server=args.gracedb_server,
                             testing=self.gracedb_testing,
                             extra_strings=[comment],
                             search=args.gracedb_search)
            else:
                event.save(fname)

    def dump(self, results, name, store_psd=False, time_index=None,
             store_loudest_index=False, raw_results=None, gates=None):
        """Save the results from this time block to an hdf output file.
        """
        if self.use_date_prefix:
            tm = lal.GPSToUTC(int(time_index))
            subdir = '{:04d}_{:02d}_{:02d}'.format(tm[0], tm[1], tm[2])
            makedir(os.path.join(self.path, subdir))
            fname = os.path.join(self.path, subdir, name) + '.hdf'
        else:
            makedir(self.path)
            fname = os.path.join(self.path, name) + '.hdf'

        with h5py.File(fname, 'w') as f:
            f.attrs['pycbc_version'] = version.git_verbose_msg
            f.attrs['command_line'] = sys.argv
            f.attrs['num_live_detectors'] = len(self.live_detectors)

            def h5py_unicode_workaround(stuff):
                # workaround for the fact that h5py cannot handle Unicode
                # numpy arrays that show up with Python 3
                if hasattr(stuff, 'dtype') and stuff.dtype.kind == 'U':
                    return [s.encode() for s in stuff]
                return stuff

            for ifo in results:
                for k in results[ifo]:
                    f['%s/%s' % (ifo, k)] = \
                            h5py_unicode_workaround(results[ifo][k])

            for key in raw_results:
                f[key] = h5py_unicode_workaround(raw_results[key])

            if store_loudest_index:
                for ifo in results:
                    if 'snr' in results[ifo]:
                        s = numpy.array(results[ifo]['snr'], ndmin=1)
                        c = numpy.array(results[ifo]['chisq'], ndmin=1)
                        nsnr = numpy.array(newsnr(s, c), ndmin=1) if len(s) > 0 else []

                        # loudest by newsnr
                        nloudest = numpy.argsort(nsnr)[::-1][0:store_loudest_index]

                        # loudest by snr
                        sloudest = numpy.argsort(s)[::-1][0:store_loudest_index]
                        f[ifo]['loudest'] = numpy.union1d(nloudest, sloudest)

            for ifo in (gates or {}):
                gate_dtype = [('center_time', float),
                              ('zero_half_width', float),
                              ('taper_width', float)]
                f['{}/gates'.format(ifo)] = \
                        numpy.array(gates[ifo], dtype=gate_dtype)

        for ifo in (store_psd or {}):
            if store_psd[ifo] is not None:
                store_psd[ifo].save(fname, group='%s/psd' % ifo)


parser = argparse.ArgumentParser(description=__doc__)
pycbc.waveform.bank.add_approximant_arg(parser)
parser.add_argument('--verbose', action='store_true')
parser.add_argument('--version', action='version', version=version.git_verbose_msg)
parser.add_argument('--bank-file', required=True,
                    help="Template bank file in XML or HDF format")
parser.add_argument('--low-frequency-cutoff', help="low frequency cutoff", type=int)
parser.add_argument('--sample-rate', help="output sample rate", type=int)
parser.add_argument('--chisq-bins', help="Number of chisq bins")
parser.add_argument('--analysis-chunk', type=int, required=True,
                        help="Amount of data to produce triggers in a block")

parser.add_argument('--snr-threshold', type=float,
                    help='SNR threshold for generating a trigger')
parser.add_argument('--snr-abort-threshold', type=float)

parser.add_argument('--channel-name', action=MultiDetOptionAction, nargs='+',
                    required=True)
parser.add_argument('--state-channel', action=MultiDetOptionAction, nargs='+',
                    help="Channel containing frame status information. Used "
                         "to determine when to analyze the hoft data. This somewhat "
                         "corresponds to CAT1 information")
parser.add_argument('--analyze-flags', action=MultiDetOptionAction, nargs='+',
                    help='The flags that must be in the "good" state to analyze data')
parser.add_argument('--data-quality-channel', action=MultiDetOptionAction, nargs='+',
                    help="Channel containing data quality information. Used "
                         "to determine when hoft may be suspect and may be used to veto"
                         "triggers or not analyze a segment of data. This roughly "
                         "corresponds to CAT2 information")
parser.add_argument('--data-quality-flags', action=MultiDetOptionAction, nargs='+',
                    help='Flags used to determine when to throw triggers away. '
                         'For each detector, give a comma-separated list of flags.')
parser.add_argument('--data-quality-padding', type=float, default=0,
                    help='Time in seconds around a bad dq time to additionally remove triggers')
parser.add_argument('--frame-src', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-type', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--force-update-cache', action='store_true')
parser.add_argument('--highpass-frequency', type=float,
                    help="Frequency to apply highpass filtering")
parser.add_argument('--highpass-reduction', type=float,
                    help="DB to reduce low frequencies")
parser.add_argument('--highpass-bandwidth', type=float,
                    help="Width of the highpass turnover region in Hz")
parser.add_argument('--psd-recalculate-difference', type=float, default=.01)
parser.add_argument('--psd-abort-difference', type=float, default=.20)
parser.add_argument('--psd-samples', type=int, required=True,
                    help="Number of PSD segments to use in the rolling estimate")
parser.add_argument('--psd-segment-length', type=int, required=True,
                    help="Length in seconds of each PSD segment")
parser.add_argument('--psd-inverse-length', type=float,
                    help="Length in time for the equivalent FIR filter")
parser.add_argument('--trim-padding', type=float, default=0.25,
                    help="Padding around the overwhitened analysis block")
parser.add_argument("--enable-bank-start-frequency", action='store_true',
                    help="Read the starting frequency of template waveforms"
                         " from the template bank")

parser.add_argument('--autogating-threshold', type=float, metavar='SIGMA',
                    help='If given, find and gate glitches '
                         'producing a deviation larger than '
                         'SIGMA in the whitened strain time '
                         'series.')
parser.add_argument('--autogating-pad', type=float, default=0.5,
                    metavar='SECONDS',
                    help='Ignore the given length of whitened '
                         'strain at the ends of a segment, to '
                         'avoid filters ringing.')
parser.add_argument('--autogating-cluster', type=float, default=1,
                    metavar='SECONDS',
                    help='Length of clustering window for '
                         'detecting glitches for autogating.')
parser.add_argument('--autogating-width', type=float, default=0.25,
                    metavar='SECONDS', help='Half-width of the gating window.')
parser.add_argument('--autogating-taper', type=float, metavar='SECONDS',
                    default=0.25,
                    help='Taper the strain before and after '
                         'each gating window over a duration '
                         'of SECONDS.')

parser.add_argument('--sync', action='store_true')
parser.add_argument('--increment-update-cache', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-read-timeout', type=float, default=30)
parser.add_argument('--increment', type=int, default=8)

parser.add_argument('--start-time', type=int, default=None,
                    help='Start the analysis at the given GPS time')
parser.add_argument('--end-time', type=int, default=numpy.inf,
                    help='Stop the analysis at the given GPS time')

parser.add_argument('--output-path', required=True,
                    help='Path to a directory to store results in')
parser.add_argument('--output-status', type=str, metavar='PATH',
                    help='If given, PyCBC Live will periodically write JSON '
                         'status info to PATH, so the analysis can be '
                         'monitored via Nagios')
parser.add_argument('--day-hour-output-prefix', action='store_true')
parser.add_argument('--store-psd', action='store_true')
parser.add_argument('--output-background', type=str, nargs='+',
                    help='Takes a period in seconds and a file path and dumps '
                         'the coinc backgrounds to that path with that period')
parser.add_argument('--output-background-n-loudest', type=int, default=10000,
                    help="If given an integer (assumed positive), it stores loudest n triggers"
                    "(not sorted) for each of the coinc background. If 0, all bkg will be dumped.")

parser.add_argument('--newsnr-threshold', type=float, default=0)
parser.add_argument('--max-batch-size', type=int, default=2**27)
parser.add_argument('--store-loudest-index', type=int, default=0)
parser.add_argument('--max-psd-abort-distance', type=float, default=numpy.inf)
parser.add_argument('--min-psd-abort-distance', type=float, default=-numpy.inf)
parser.add_argument('--max-triggers-in-batch', type=int)
parser.add_argument('--max-length', type=float,
                    help='Maximum duration of templates, used to set the data buffer size')

parser.add_argument('--enable-profiling', type=int, metavar='RANK',
                    help='Dump out profiling information from the MPI process'
                         ' with given rank at the end of program execution')

parser.add_argument('--enable-background-estimation', default=False, action='store_true')
parser.add_argument('--ifar-upload-threshold', type=float, required=True,
                    help='Inverse-FAR threshold for uploading coincident '
                         'triggers to GraceDB, in years.')
parser.add_argument('--ifar-double-followup-threshold', type=float, required=True,
                    help='Inverse-FAR threshold to followup double coincs with'
                         'additional detectors')
parser.add_argument('--enable-gracedb-upload', action='store_true', default=False,
                    help='Upload triggers to GraceDB')
parser.add_argument('--file-prefix', default='Live')
parser.add_argument('--enable-production-gracedb-upload', action='store_true', default=False,
                    help='Do not mark triggers uploaded to GraceDB as test '
                         'events. This option should *only* be enabled in '
                         'production analyses!')
parser.add_argument('--enable-single-detector-upload', action='store_true', default=False)
parser.add_argument('--pvalue-combination-livetime', type=float, required=True,
                    help="Livetime used for p-value combination with followup "
                         "detectors, in years")
parser.add_argument('--enable-single-detector-background', action='store_true', default=False)
parser.add_argument('--round-start-time', type=int, metavar='X',
                    help="Round up the start time to the nearest multiple of X"
                         " seconds. This is useful for forcing agreement "
                         " with frame file production.")
parser.add_argument('--gracedb-server', metavar='URL',
                    help='URL of GraceDB server API for uploading events. '
                         'If not provided, the default URL is used.')
parser.add_argument('--gracedb-search', type=str, default='AllSky',
                    help='String going into the "search" field of the GraceDB '
                         'events')
parser.add_argument('--size-override', type=int, metavar='N',
                    help="Override the internal MPI size layout. "
                         " Useful for debugging and running a portion of a bank")
parser.add_argument('--fftw-planning-limit', type=float,
                    help="Time in seconds to allow for a plan to be created")
parser.add_argument('--run-snr-optimization', action='store_true',
                    default=False,
                    help='Run spawned followup processes to maximize SNR for '
                         'any trigger uploaded to GraceDB')
parser.add_argument('--snr-opt-timeout', type=int, default=400, metavar='SECONDS',
                    help='Maximum allowed duration of followup process to maximize SNR')


scheme.insert_processing_option_group(parser)
LiveSingle.insert_args(parser)
fft.insert_fft_option_group(parser)
Coincer.insert_args(parser)
SingleDetSGChisq.insert_option_group(parser)
mchirp_area.insert_args(parser)
args = parser.parse_args()
scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)

if args.output_background is not None and len(args.output_background) != 2:
    parser.error('--output-background takes two parameters: period and path')

log_format = "%(asctime)s {0} %(message)s".format(platform.node())
pycbc.init_logging(args.verbose, format=log_format)

ctx = scheme.from_cli(args)
fft.from_cli(args)

# Approximant guess of the total padding
valid_pad = args.analysis_chunk
total_pad = args.trim_padding * 2 + valid_pad
lfc = None if args.enable_bank_start_frequency else args.low_frequency_cutoff
bank = waveform.LiveFilterBank(
        args.bank_file, args.sample_rate, total_pad, low_frequency_cutoff=lfc,
        approximant=args.approximant, increment=args.increment)
if bank.min_f_lower < args.low_frequency_cutoff:
    parser.error('--low-frequency-cutoff ({} Hz) must not be larger than the '
                 'minimum f_lower across all templates '
                 '({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower))

# ifos used for primary analysis
ifos = set(args.channel_name.keys())
logging.info('Running with %s', ppdets(ifos))

evnt = LiveEventManager(args.output_path,
                        use_date_prefix=args.day_hour_output_prefix,
                        ifar_upload_threshold=args.ifar_upload_threshold,
                        pval_livetime=args.pvalue_combination_livetime,
                        enable_gracedb_upload=args.enable_gracedb_upload,
                        gracedb_testing=not args.enable_production_gracedb_upload,
                        run_snr_optimization=args.run_snr_optimization,
                        mc_area_args=mchirp_area.from_cli(args))

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

if args.size_override:
    evnt.size = args.size_override

# make sure we can talk to GraceDB
if evnt.rank == 0 and args.enable_gracedb_upload:
    logging.info('Testing access to GraceDB')
    from ligo.gracedb.rest import GraceDb
    gdb_client = GraceDb(args.gracedb_server) if args.gracedb_server else GraceDb()
    response = gdb_client.ping()
    logging.info('GraceDB ping response: %s %s',
                 response.status, response.reason)
    del gdb_client

# I'm not the root, so do some actual filtering.
with ctx:
    try:
        # Import system wisdom.
        if args.fftw_import_system_wisdom:
            fft.fftw.import_sys_wisdom()

        # Read specified user-provided wisdom files
        if args.fftw_input_float_wisdom_file is not None:
            fft.fftw.import_single_wisdom_from_filename(args.fftw_input_float_wisdom_file)

        if args.fftw_input_double_wisdom_file is not None:
            fft.fftw.import_double_wisdom_from_filename(args.fftw_input_double_wisdom_file)

        if args.fftw_planning_limit:
            fft.fftw.set_planning_limit(args.fftw_planning_limit)
    except (ValueError, RuntimeError) as e:
        print(e)
        exit()

    maxlen = args.psd_segment_length * (args.psd_samples // 2 + 1)
    if evnt.rank > 0:
        bank.table.sort(order='mchirp')
        waveforms = list(bank[evnt.rank-1::evnt.size-1])
        lengths = numpy.array([1.0 / wf.delta_f for wf in waveforms])
        psd_len = args.psd_segment_length * (args.psd_samples // 2 + 1)
        maxlen = max(lengths.max(), psd_len)
        mf = LiveBatchMatchedFilter(waveforms, args.snr_threshold,
                                    args.chisq_bins, sg_chisq,
                                    snr_abort_threshold=args.snr_abort_threshold,
                                    newsnr_threshold=args.newsnr_threshold,
                                    max_triggers_in_batch=args.max_triggers_in_batch,
                                    maxelements=args.max_batch_size)

    # Synchronize start time if not provided on the command line
    if not args.start_time:
        evnt.barrier()
        tnow = lal.GPSTimeNow() if evnt.rank == 0 else None
        args.start_time = evnt.comm.bcast(tnow, root=0)

    if args.round_start_time:
        args.start_time = int(args.start_time / args.round_start_time + 1) * args.round_start_time
        logging.info('Starting from: %s', args.start_time)

    # initialize the data readers for all detectors
    if args.max_length is not None:
        maxlen = args.max_length
    maxlen = int(maxlen)
    data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen)
                   for ifo in ifos}

    # create single-detector background "estimators"
    if args.enable_single_detector_background and evnt.rank == 0:
        sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
                          for ifo in ifos}

    # Create double coincident background estimator for every combo
    if args.enable_background_estimation and evnt.rank == 0:
        ifo_combos = itertools.combinations(ifos, 2)
        estimators = []
        for combo in ifo_combos:
            logging.info('Will calculate %s background', combo)
            estimators.append(Coincer.from_cli(args,
                              len(bank), args.analysis_chunk, list(combo)))

        my_coinc_id = 999999
        def set_coinc_id(i):
            global my_coinc_id
            my_coinc_id = i

        def get_coinc(results):
            c = estimators[my_coinc_id]
            r = c.add_singles(results)
            logging.info('Coincs %i: %s-%s: %s in cbuffer', my_coinc_id,
                         c.ifos[0], c.ifos[1], c.coincs.index)
            return r

        def output_background(_):
            estim = estimators[my_coinc_id]
            bg_time = estim.background_time / lal.YRJUL_SI
            return estim.ifos, estim.coincs.data, bg_time

        coinc_pool = BroadcastPool(len(estimators))
        coinc_pool.allmap(set_coinc_id, range(len(estimators)))

    logging.info('%s: Starting...', evnt.rank)

    if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
        pr = cProfile.Profile()
        pr.enable()

    # main analysis loop
    data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
    last_bg_dump_time = int(data_end())
    while data_end() < args.end_time:
        t1 = time()
        logging.info('%s: Analyzing from %s', evnt.rank, data_end())

        results = {}
        evnt.live_detectors = set()

        for ifo in ifos:
            results[ifo] = False
            status = data_reader[ifo].advance(valid_pad, timeout=args.frame_read_timeout)

            if status is True:
                status = data_reader[ifo].recalculate_psd()

            if data_reader[ifo].psd is not None:
                dist = data_reader[ifo].psd.dist
                if dist < args.min_psd_abort_distance or dist > args.max_psd_abort_distance:
                    logging.info("%s PSD dist %s outside acceptable range [%s, %s]",
                                 ifo, dist, args.min_psd_abort_distance,
                                 args.max_psd_abort_distance)
                    status = False

            if status is True:
                evnt.live_detectors.add(ifo)
                if evnt.rank > 0:
                    logging.info('%s: Filtering %s', evnt.rank, ifo)
                    results[ifo] = mf.process_data(data_reader[ifo])
            else:
                logging.info('Insufficient data for %s analysis', ifo)

        if evnt.rank > 0:
            evnt.commit_results((results, data_end()))
        else:
            psds = {ifo: data_reader[ifo].psd for ifo in data_reader if data_reader[ifo].psd is not None}

            # Collect together the single detector triggers
            if evnt.size > 1:
                results, valid_end = evnt.gather_results()

            # veto detectors with different state between the master
            # and worker nodes (e.g. late frame files on one node only)
            detectors_with_results = set(results.keys())
            evnt.live_detectors.intersection_update(detectors_with_results)
            for ifo in detectors_with_results - evnt.live_detectors:
                results.pop(ifo)

            # Veto single detector triggers that fail the DQ vector
            for ifo in results:
                if data_reader[ifo].dq is None:
                    continue
                logging.info("Checking %s's DQ vector", ifo)
                start = data_reader[ifo].start_time
                times = results[ifo]['end_time']
                idx = data_reader[ifo].dq.indices_of_flag(
                        start, valid_pad, times,
                        padding=data_reader[ifo].dq_padding)
                logging.info('Keeping %d/%d %s triggers',
                             len(idx), len(times), ifo)
                for key in results[ifo]:
                    if len(results[ifo][key]):
                        results[ifo][key] = results[ifo][key][idx]

            # Look for coincident triggers and do background estimation
            if args.enable_background_estimation:
                coinc_results = coinc_pool.broadcast(get_coinc, results)

                # Pick the best coinc in this chunk
                best_coinc = Coincer.pick_best_coinc(coinc_results)

                evnt.check_coincs(list(results.keys()), best_coinc,
                                  psds, args.low_frequency_cutoff,
                                  data_reader, bank)

            # Check for singles
            if args.enable_single_detector_background:
                evnt.check_singles(results, data_reader, psds,
                                   args.low_frequency_cutoff)

            gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader}

            # map the results file to an hdf file
            prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)),
                                          args.file_prefix,
                                          data_end() - args.analysis_chunk,
                                          valid_pad)

            evnt.dump(results, prefix, time_index=data_end(),
                      store_psd=(psds if args.store_psd else False),
                      store_loudest_index=args.store_loudest_index,
                      raw_results=best_coinc, gates=gates)

            # dump the background if needed
            if args.output_background and \
                    data_end() - last_bg_dump_time > float(args.output_background[0]):
                last_bg_dump_time = int(data_end())
                bg_dists = coinc_pool.broadcast(output_background, None)
                bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)),
                                                           last_bg_dump_time)
                bg_fn = os.path.join(args.output_background[1], bg_fn)
                with h5py.File(bg_fn, 'w') as bgf:
                    for bg_ifos, bg_data, bg_time in bg_dists:
                        if args.output_background_n_loudest and (args.output_background_n_loudest < len(bg_data)-1):
                            n_loudest = args.output_background_n_loudest
                            assert (n_loudest > 0), "We can only store positive int loudest triggers."
                            ds = bgf.create_dataset(','.join(sorted(bg_ifos)),
                                    data=-numpy.partition(-bg_data, n_loudest)[:n_loudest], compression='gzip')
                        else:
                            ds = bgf.create_dataset(','.join(sorted(bg_ifos)),
                                                    data=bg_data, compression='gzip')
                        ds.attrs['background_time'] = bg_time
                    bgf.attrs['gps_time'] = last_bg_dump_time

            logging.info('Finished Analyzing up to %s', data_end())

        if args.sync:
            evnt.barrier()
        tdiff = time() - t1
        lag = float(lal.GPSTimeNow() - data_end())
        logging.info('%s: Took %1.2f, duty factor of %.2f, '
                     'lag %.2f s, %d live detectors',
                     evnt.rank, tdiff, tdiff / valid_pad, lag,
                     len(evnt.live_detectors))

        if args.output_status is not None and evnt.rank == 0:
            if lag > 120:
                status_intervals = [{'num_status': 2,
                                     'txt_status': 'CRITICAL: lag greater than 2 min',
                                     'start_sec': 0}]
            else:
                status_intervals = [{'num_status': 0,
                                     'txt_status': 'OK: No reported problems',
                                     'start_sec': 0},
                                    {'num_status': 1,
                                     'txt_status': 'WARNING: last report between 2 and 4 min ago',
                                     'start_sec': 120}]
            status_intervals.append({'num_status': 2,
                                     'txt_status': 'CRITICAL: last report more than 4 min ago',
                                     'start_sec': 240})
            status = {'author': 'Tito Dal Canton',
                      'email': 'tito.canton@ligo.org',
                      'created_gps': int(lal.GPSTimeNow()),
                      'status_intervals': status_intervals}
            try:
                with open(args.output_status, 'w') as status_fp:
                    json.dump(status, status_fp)
            except IOError:
                logging.error('I/O error writing status JSON file! '
                              'Hopefully it works next time')

if evnt.rank == 1:
    if args.fftw_output_float_wisdom_file:
        fft.fftw.export_single_wisdom_to_filename(args.fftw_output_float_wisdom_file)

    if args.fftw_output_double_wisdom_file:
        fft.fftw.export_double_wisdom_to_filename(args.fftw_output_double_wisdom_file)

if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
    pr.dump_stats('profiling_rank_{:03d}'.format(evnt.rank))
back to top