swh:1:snp:3a699297f000109a1bc833f294a54171df990207
Raw File
Tip revision: 3f8c4b8adbfe4f9e9a243f8ba3eb2a1da1b677a3 authored by Tito Dal Canton on 02 March 2023, 13:19:16 UTC
Change `setup.py` for 2.1.0 release (#4274)
Tip revision: 3f8c4b8
pycbc_live
#!/usr/bin/env python

"""Detect compact-binary-merger gravitational-wave signals in low latency. This
program takes a fixed template bank and performs matched filtering on a
continuous stream of strain data in fixed short blocks of time. It is capable
of reading from low latency data directories on the LDG clusters. Work is
parallelized through the use of MPI, so this script should typically be
launched with mpirun.

See https://arxiv.org/abs/1805.11174 for an overview."""

import sys
import argparse, numpy, pycbc, logging, cProfile, h5py, lal, json
import os.path
import itertools
import platform
import subprocess
from multiprocessing.dummy import threading
from matplotlib import use
use('agg')
from shutil import which
from mpi4py import MPI as mpi
from astropy.time import Time
from pycbc.pool import BroadcastPool
from pycbc import fft, version, waveform, scheme, makedir
from pycbc.types import MultiDetOptionAction, MultiDetMultiColonOptionAction
from pycbc.filter import LiveBatchMatchedFilter, compute_followup_snr_series
from pycbc.filter import followup_event_significance
from pycbc.strain import StrainBuffer
from pycbc.events.ranking import newsnr
from pycbc.events.coinc import LiveCoincTimeslideBackgroundEstimator as Coincer
from pycbc.events.single import LiveSingle
from pycbc.io.live import CandidateForGraceDB
from pycbc.io.hdf import recursively_save_dict_contents_to_group
from pycbc.types import positive_int
import pycbc.waveform.bank
from pycbc.vetoes.sgchisq import SingleDetSGChisq
from pycbc.waveform.waveform import props
from pycbc.population import live_pastro_utils as livepau
from pycbc import mchirp_area
from pycbc.detector import ppdets
from pycbc.filter import resample
from pycbc.psd import estimate

# Use cached class-based FFTs in the resample and estimate module
resample.USE_CACHING_FOR_LFILTER = True
estimate.USE_CACHING_FOR_WELCH_FFTS = True
estimate.USE_CACHING_FOR_INV_SPEC_TRUNC = True

try:
    from setproctitle import setproctitle
except ImportError:
    def setproctitle(title):
        pass


def ptof(p, ft):
    """Convert p-value to FAR via foreground time `ft`.
    """
    return numpy.log1p(-p) / -ft

def ftop(f, ft):
    """Convert FAR to p-value via foreground time `ft`.
    """
    return 1 - numpy.exp(-ft * f)

def combine_ifar_pvalue(ifar, pvalue, livetime):
    """Convert original IFAR to p-value and combine with followup p-value.
    """
    from scipy.stats import combine_pvalues
    # NB units of ifar and livetime must be the same
    _, pnew = combine_pvalues([ftop(1. / ifar, livetime), pvalue])
    nifar = 1. / ptof(pnew, livetime)
    # take max of original IFAR and combined IFAR & apply trials factor
    return numpy.maximum(ifar, nifar) / 2.


class LiveEventManager(object):
    def __init__(self, args, bank):
        self.low_frequency_cutoff = args.low_frequency_cutoff
        self.bank = bank

        # Figure out what we are supposed to process within the pool of MPI processes
        self.comm = mpi.COMM_WORLD
        self.size = self.comm.Get_size()
        self.rank = self.comm.Get_rank()

        if self.rank > 0:
            # We are a matched-filtering process, so nothing else is needed
            return

        self.path = args.output_path
        self.mc_area_args = mchirp_area.from_cli(args, parser)
        self.padata = livepau.PAstroData(args.p_astro_spec, args.bank_file)
        self.use_date_prefix = args.day_hour_output_prefix
        self.ifar_upload_threshold = args.ifar_upload_threshold
        self.pvalue_livetime = args.pvalue_combination_livetime
        self.gracedb_testing = not args.enable_production_gracedb_upload
        self.enable_gracedb_upload = args.enable_gracedb_upload
        self.run_snr_optimization = args.run_snr_optimization
        self.gracedb = None

        # Keep track of which events have been uploaded
        self.last_few_coincs_uploaded = []

        if self.run_snr_optimization:
            # preestimate the number of CPU cores that we can afford giving
            # to followup processes without slowing down the main search
            available_cores = len(os.sched_getaffinity(0))
            bg_cores = len(tuple(itertools.combinations(ifos, 2)))
            analysis_cores = 1 + bg_cores
            self.fu_cores = available_cores - analysis_cores
            if self.fu_cores <= 0:
                logging.warning('Insufficient number of CPU cores (%d) to '
                                'run search and trigger followups. Uploaded '
                                'triggers will momentarily increase the lag',
                                available_cores)
                self.fu_cores = 1

    def commit_results(self, results):
        logging.info('Committing triggers')
        self.comm.gather(results, root=0)

    def barrier(self):
        self.comm.Barrier()

    def barrier_status(self, status):
        return self.comm.allreduce(status, op=mpi.LAND)

    def gather_results(self):
        """ Collect results from the mpi subprocesses and collate them into
        contiguous sets of arrays.
        """

        if self.rank != 0:
            raise RuntimeError('Not root process')

        logging.info('Gathering triggers')
        all_results = self.comm.gather(None, root=0)
        data_ends = [a[1] for a in all_results if a is not None]
        results = [a[0] for a in all_results if a is not None]

        combined = {}
        for ifo in results[0]:
            # check if any of the results returned invalid
            try:
                for r in results:
                    if r[ifo] is False:
                        raise ValueError
            except ValueError:
                continue
            combined[ifo] = {}
            for key in results[0][ifo]:
                combined[ifo][key] = numpy.concatenate([r[ifo][key] for r in results])

        return combined, data_ends[0]

    def compute_followup_data(self, ifos, triggers,
                              followup_ifos=None, recalculate_ifar=False):
        """Figure out which of the followup detectors are usable, and compute
        SNR time series for all the available detectors.
        """
        out = {}
        followup_ifos = [] if followup_ifos is None else followup_ifos

        template_id = triggers[f'foreground/{ifos[0]}/template_id']
        htilde = self.bank[template_id]

        coinc_times = {
            ifo: triggers[f'foreground/{ifo}/end_time'] for ifo in ifos
        }

        # Get the SNR series for the ifos that made the initial coinc
        for ifo in ifos:
            # NOTE we only check the state/DQ of followup IFOs here.
            # IFOs producing the coincidence are assumed to also
            # produce valid SNR series.
            snr_series = compute_followup_snr_series(
                self.data_readers[ifo],
                htilde,
                coinc_times[ifo],
                check_state=False
            )

            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}

        # Determine if the other ifos can contribute to the coincident event
        for ifo in followup_ifos:
            snr_series, ptime, pvalue, sigma2 = followup_event_significance(
                ifo,
                self.data_readers[ifo],
                self.bank,
                template_id,
                coinc_times
            )
            if snr_series is not None:
                out[ifo] = {'snr_series': snr_series}
                self.get_followup_info(ifos[0], ifo, triggers, snr_series,
                                       ptime, pvalue, sigma2,
                                       recalculate_ifar=recalculate_ifar)

        # the SNR time series sample rate can vary slightly due to
        # rounding errors, so force all of them to be identical
        fix_delta_t = None
        for ifo in out:
            if 'snr_series' not in out[ifo]:
                continue
            if fix_delta_t is None:
                fix_delta_t = out[ifo]['snr_series']._delta_t
            else:
                out[ifo]['snr_series']._delta_t = fix_delta_t

        return out

    def get_followup_info(self, coinc_ifo, ifo, triggers, snr_series, ptime,
                          pvalue, sigma2, recalculate_ifar=False):
        # Copy the common fields from the other detector
        # ignore fields that contain detector-specific data
        fields_to_ignore = set(['end_time', 'snr', 'stat', 'coa_phase',
                                'chisq', 'chisq_dof', 'sg_chisq', 'sigmasq'])
        for key in set(triggers):
            if 'foreground/{}/'.format(coinc_ifo) in key:
                _, _, name = key.split('/')
                if name in fields_to_ignore:
                    continue
                triggers['foreground/{}/{}'.format(ifo, name)] = triggers[key]

        # Set the detector-specific fields for which we have data
        snr_series_peak = snr_series.at_time(ptime, nearest_sample=True)
        base = 'foreground/{}/'.format(ifo)
        triggers[base + 'end_time'] = float(ptime)
        triggers[base + 'snr'] = triggers[base + 'stat'] = abs(snr_series_peak)
        triggers[base + 'coa_phase'] = numpy.angle(snr_series_peak)
        triggers[base + 'sigmasq'] = sigma2
        if recalculate_ifar:
            # Calculate new ifar
            triggers['foreground/ifar'] = combine_ifar_pvalue(
                    triggers['foreground/ifar'], pvalue, self.pvalue_livetime)
            triggers['foreground/pvalue_{}'.format(ifo)] = pvalue

    def setup_optimize_snr(
        self, results, live_ifos, triggering_ifos, fname, gid
    ):
        """Setup and start the network SNR optimization process for a
        candidate event. See arXiv:2008.07494 for details.

        Parameters
        ----------
        results : dict
            Information about the candidate.
        live_ifos : list
            List of detectors with usable data.
        triggering_ifos : list
            List of detectors that originally identified the candidate,
            providing reliabile estimates of the merger time. Must not be a
            superset of `live_ifos`.
        fname : str
            File name where the candidate information has been saved.
            Used to name other files produced as part of setting up the
            SNR optimization.
        gid : str
            GraceDB ID of the candidate.
        """
        template_id = results[f'foreground/{live_ifos[0]}/template_id']
        p = props(self.bank.table[template_id])
        p.pop('approximant')
        apr = self.bank.approximant(template_id)
        min_buffer = self.bank.minimum_buffer + 0.5
        buff_size = \
            pycbc.waveform.get_waveform_filter_length_in_time(apr, **p)

        tlen = self.bank.round_up((buff_size + min_buffer) \
            * self.bank.sample_rate)
        flen = int(tlen / 2 + 1)
        delta_f = self.bank.sample_rate / float(tlen)
        cmd = 'timeout {} '.format(args.snr_opt_timeout)
        exepath = which('pycbc_optimize_snr')
        cmd += exepath + ' '
        data_fils_str = '--data-files '
        psd_fils_str = '--psd-files '
        for ifo in live_ifos:
            curr_fname = \
                fname.replace('.xml.gz',
                              '_{}_data_overwhitened.hdf'.format(ifo))
            curr_data = self.data_readers[ifo].overwhitened_data(delta_f)
            curr_data.save(curr_fname)
            data_fils_str += '{}:{} ' .format(ifo, curr_fname)
            curr_fname = fname.replace('.xml.gz',
                                       '_{}_psd.hdf'.format(ifo))
            curr_psd = curr_data.psd
            curr_psd.save(curr_fname)
            psd_fils_str += '{}:{} ' .format(ifo, curr_fname)
        cmd += data_fils_str
        cmd += psd_fils_str

        curr_fname = fname.replace('.xml.gz', '_attributes.hdf')
        with h5py.File(curr_fname, 'w') as hdfp:
            for ifo in triggering_ifos:
                curr_time = results[f'foreground/{ifo}/end_time']
                hdfp[f'coinc_times/{ifo}'] = curr_time
            f_end = self.bank.end_frequency(template_id)
            if f_end is None or f_end >= (flen * delta_f):
                f_end = (flen-1) * delta_f
            hdfp['flen'] = flen
            hdfp['delta_f'] = delta_f
            hdfp['f_end'] = f_end
            hdfp['sample_rate'] = self.bank.sample_rate
            hdfp['flow'] = self.bank.table[template_id].f_lower
            hdfp['mass1'] = self.bank.table[template_id].mass1
            hdfp['mass2'] = self.bank.table[template_id].mass2
            hdfp['spin1z'] = self.bank.table[template_id].spin1z
            hdfp['spin2z'] = self.bank.table[template_id].spin2z
            hdfp['template_duration'] = \
                self.bank.table[template_id].template_duration
            hdfp['ifar'] = results['foreground/ifar']
            if 'p_terr' in results:
                hdfp['p_terr'] = results['p_terr']
            if gid is not None:
                hdfp['gid'] = gid

            for ifo in args.channel_name:
                hdfp[f'channel_names/{ifo}'] = args.channel_name[ifo]

            recursively_save_dict_contents_to_group(hdfp,
                                                    'mc_area_args/',
                                                    self.mc_area_args)

        cmd += '--params-file {} '.format(curr_fname)
        cmd += '--approximant {} '.format(apr)
        cmd += '--gracedb-server {} '.format(args.gracedb_server)
        cmd += '--gracedb-search {} '.format(args.gracedb_search)
        if not self.gracedb_testing:
            cmd += '--production '
        cmd += '--verbose '

        # set up a place for storing the SNR optimization results and log
        out_dir_path = os.path.join(os.path.dirname(fname), 'optimize_snr')
        makedir(out_dir_path)
        cmd += f'--output-path {out_dir_path} '

        if self.enable_gracedb_upload:
            cmd += '--enable-gracedb-upload '

        cmd += '--cores {} '.format(self.fu_cores)
        if args.processing_scheme:
            # we will use the cores for multiple workers of the
            # optimization routine, so we force the processing scheme
            # to a single core here.  this may be enforcing some
            # assumptions about the optimal way to do ffts on the
            # machine. however, the dominant cost of pycbc_optimize_snr
            # is expected to be in waveform generation, which is
            # unlikely to benefit from a processing scheme with more
            # than 1 thread anyway.
            opt_scheme = args.processing_scheme.split(':')[0]
            cmd += '--processing-scheme {}:1 '.format(opt_scheme)

        log_fname = os.path.join(out_dir_path, 'optimize_snr.log')

        logging.info('running %s with log to %s', cmd, log_fname)
        with open(log_fname, "w") as logfile:
            subprocess.Popen(
                cmd, shell=True, stdout=logfile, stderr=logfile
            )

    def create_gdb(self):
        """
        Function to create GraceDB session.
        """
        from ligo.gracedb.rest import GraceDb
        logging.info('Creating GraceDB session')
        # Set up dict for args that reload the certificate if it will expire in the
        # reload_buffer time. These args are documented here:
        # https://ligo-gracedb.readthedocs.io/en/latest/api.html#ligo.gracedb.rest.GraceDb
        # Because we do not change any of the request session values when running the
        # code, it should remain thread safe. 
        gdbargs = {'reload_certificate': True, 'reload_buffer': 300}
        if args.gracedb_server:
            gdbargs['service_url'] = args.gracedb_server 
        self.gracedb = GraceDb(**gdbargs)

    def upload_in_thread(self, event, fname, comment, results, live_ifos, ifos,
                         upload_checks, optimize_snr_checks, gracedb_server, gracedb_search):
        gid = None
        if upload_checks:
            gid = event.upload(
                fname,
                gracedb_server=gracedb_server,
                testing=self.gracedb_testing,
                extra_strings=[comment],
                search=gracedb_search
            )
        if optimize_snr_checks:
            self.setup_optimize_snr(
                results,
                live_ifos,
                ifos,
                fname,
                gid
            )

    def check_coincs(self, ifos, coinc_results, psds):
        """ Perform any followup and save zerolag triggers to a coinc xml file
        """
        # Check for hardware injection
        for ifo in ifos:
            if self.data_readers[ifo].near_hwinj():
                coinc_results['HWINJ'] = True
                break

        if 'foreground/ifar' not in coinc_results:
            return

        logging.info('computing followup data for coinc')
        coinc_ifos = coinc_results['foreground/type'].split('-')
        followup_ifos = list(set(ifos) - set(coinc_ifos))

        double_ifar = coinc_results['foreground/ifar']
        if double_ifar < args.ifar_double_followup_threshold:
            coinc_results['foreground/NO_FOLLOWUP'] = True
            return

        sld = self.compute_followup_data(
            coinc_ifos,
            coinc_results,
            followup_ifos=followup_ifos,
            recalculate_ifar=True
        )

        event = CandidateForGraceDB(
            coinc_ifos,
            ifos,
            coinc_results,
            gracedb=self.gracedb,
            padata=self.padata,
            bank=self.bank,
            psds=psds,
            skyloc_data=sld,
            low_frequency_cutoff=self.low_frequency_cutoff,
            channel_names=args.channel_name,
            mc_area_args=self.mc_area_args,
        )

        fname = os.path.join(
            self.get_candidate_path(event.merger_time),
            f'coinc-{event.merger_time:.3f}.xml.gz'
        )
        logging.info('Coincident candidate! Saving as %s', fname)

        # Verbally explain some details not obvious from the other info
        comment = ('Trigger produced as a {} coincidence. '
                   'FAR is based on all listed detectors.<br />'
                   'Two-detector ranking statistic: {}<br />'
                   'Followup detectors: {}')
        comment = comment.format(ppdets(coinc_ifos),
                                 args.ranking_statistic,
                                 ppdets(followup_ifos))

        ifar = coinc_results['foreground/ifar']
        upload_checks = self.enable_gracedb_upload and self.ifar_upload_threshold < ifar
        optimize_snr_checks = self.run_snr_optimization and self.ifar_upload_threshold < ifar
        
        # Keep track of the last few coincs uploaded in order to
        # prevent singles being uploaded as well for coinc events
        self.last_few_coincs_uploaded.append(event.merger_time)
        # Only need to keep a few (10) events
        self.last_few_coincs_uploaded = \
            self.last_few_coincs_uploaded[-10:]

        if not upload_checks:
            event.save(fname)
        if upload_checks or optimize_snr_checks:
            if optimize_snr_checks:
                logging.info('Optimizing SNR for coinc above threshold ..')
                # Tell snr optimized event about p_terr
                if hasattr(event, 'p_terr') and event.p_terr is not None:
                    coinc_results['p_terr'] = event.p_terr
            live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]
            thread_args = (
                event, fname, comment, coinc_results, live_ifos, coinc_ifos,
                upload_checks, optimize_snr_checks, args.gracedb_server, 
                args.gracedb_search,
            )
            gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
                                                 args=thread_args)
            gdb_upload_thread.start()

    def check_singles(self, results, psds):
        active = [k for k in results if results[k] is not None]
        for ifo in active:
            single = sngl_estimator[ifo].check(
                results[ifo],
                self.data_readers[ifo]
            )

            if single is None:
                continue

            sifar = single['foreground/ifar']
            logging.info(f'Found {ifo} single with ifar {sifar}')

            followup_ifos = [i for i in active if i is not ifo]
            # Don't recompute ifar considering other ifos
            sld = self.compute_followup_data(
                [ifo],
                single,
                followup_ifos=followup_ifos,
                recalculate_ifar=False
            )
            # Apply a trials factor of the number of active detectors
            single['foreground/ifar'] = sifar / len(active)

            event = CandidateForGraceDB(
                [ifo],
                active,
                single,
                gracedb=self.gracedb,
                padata=self.padata,
                bank=self.bank,
                psds=psds,
                skyloc_data=sld,
                low_frequency_cutoff=self.low_frequency_cutoff,
                channel_names=args.channel_name,
                mc_area_args=self.mc_area_args,
            )

            fname = os.path.join(
                self.get_candidate_path(event.merger_time),
                f'single-{ifo}-{event.merger_time:.3f}.xml.gz'
            )
            logging.info('Single-detector candidate! Saving as %s', fname)

            # Verbally explain some details not obvious from the other info
            comment = ('Trigger produced as a {0} single. '
                       'FAR is based on {0} only.<br />'
                       'Followup detectors: {1}')
            comment = comment.format(ifo, ppdets(followup_ifos))

            # Has a coinc event at this time been uploaded recently?
            # If so, skip upload
            coinc_tdiffs = abs(event.merger_time -
                               numpy.array(self.last_few_coincs_uploaded))
            nearby_coincs = coinc_tdiffs < 0.1
            if any(nearby_coincs):
                logging.info(
                    "Single-detector candidate at time %.3f was already "
                    "reported as coinc, skipping upload",
                    event.merger_time
                )

            upload_checks = args.enable_single_detector_upload and \
                self.ifar_upload_threshold < single['foreground/ifar'] and \
                not any(nearby_coincs)
            optimize_snr_checks = \
                self.ifar_upload_threshold < single['foreground/ifar'] and \
                not any(nearby_coincs) and \
                self.run_snr_optimization
            if not upload_checks:
                event.save(fname)
            if upload_checks or optimize_snr_checks:
                if optimize_snr_checks:
                    logging.info('Optimizing SNR for single above threshold ..')
                    # Tell snr optimized event about p_terr
                    if hasattr(event, 'p_terr') and event.p_terr is not None:
                        single['p_terr'] = event.p_terr
                live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]
                thread_args = (
                    event, fname, comment, single, live_ifos, [ifo],
                    upload_checks, optimize_snr_checks, args.gracedb_server, 
                    args.gracedb_search,
                    )
                gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
                                                     args=thread_args)
                gdb_upload_thread.start()

    def get_out_dir_path(self, time_gps):
        """Compose and return the path to a directory to store files associated
        with times in a given day (specified via a GPS time).
        Create the directory if it does not exist.
        """
        path = self.path
        if self.use_date_prefix:
            time_dt = Time(time_gps, format='gps', scale='utc').to_datetime()
            path = os.path.join(
                path,
                f'{time_dt.year:04d}_{time_dt.month:02d}_{time_dt.day:02d}'
            )
        makedir(path)
        return path

    def get_candidate_path(self, merger_time):
        """Compose and return the path to a directory to store files associated
        with a particular candidate event. Create the directory if it does not
        exist.
        """
        path = os.path.join(
            self.get_out_dir_path(merger_time),
            f'candidate_{merger_time:.3f}_{pycbc.random_string(4)}'
        )
        makedir(path)
        return path

    def dump(self, results, name, store_psd=False, time_index=None,
             store_loudest_index=False, raw_results=None, gates=None):
        """Save the results from this time block to an hdf output file.
        """
        fname = os.path.join(
            self.get_out_dir_path(time_index),
            name + '.hdf'
        )

        with h5py.File(fname, 'w') as f:
            f.attrs['pycbc_version'] = version.git_verbose_msg
            f.attrs['command_line'] = sys.argv
            f.attrs['num_live_detectors'] = len(self.live_detectors)

            def h5py_unicode_workaround(stuff):
                # workaround for the fact that h5py cannot handle Unicode
                # numpy arrays that show up with Python 3
                if hasattr(stuff, 'dtype') and stuff.dtype.kind == 'U':
                    return [s.encode() for s in stuff]
                return stuff

            for ifo in results:
                for k in results[ifo]:
                    f['%s/%s' % (ifo, k)] = \
                            h5py_unicode_workaround(results[ifo][k])

            for key in raw_results:
                f[key] = h5py_unicode_workaround(raw_results[key])

            if store_loudest_index:
                for ifo in results:
                    if 'snr' in results[ifo]:
                        s = numpy.array(results[ifo]['snr'], ndmin=1)
                        c = numpy.array(results[ifo]['chisq'], ndmin=1)
                        nsnr = numpy.array(newsnr(s, c), ndmin=1) if len(s) > 0 else []

                        # loudest by newsnr
                        nloudest = numpy.argsort(nsnr)[::-1][0:store_loudest_index]

                        # loudest by snr
                        sloudest = numpy.argsort(s)[::-1][0:store_loudest_index]
                        f[ifo]['loudest'] = numpy.union1d(nloudest, sloudest)

            for ifo in (gates or {}):
                gate_dtype = [('center_time', float),
                              ('zero_half_width', float),
                              ('taper_width', float)]
                f['{}/gates'.format(ifo)] = \
                        numpy.array(gates[ifo], dtype=gate_dtype)

        for ifo in (store_psd or {}):
            if store_psd[ifo] is not None:
                store_psd[ifo].save(fname, group='%s/psd' % ifo)


parser = argparse.ArgumentParser(description=__doc__)
pycbc.waveform.bank.add_approximant_arg(parser)
parser.add_argument('--verbose', action='store_true')
parser.add_argument('--version', action='version', version=version.git_verbose_msg)
parser.add_argument('--bank-file', required=True,
                    help="Template bank file in XML or HDF format")
parser.add_argument('--low-frequency-cutoff', help="low frequency cutoff", type=int)
parser.add_argument('--sample-rate', help="output sample rate", type=int)
parser.add_argument('--chisq-bins', help="Number of chisq bins")
parser.add_argument('--analysis-chunk', type=int, required=True,
                        help="Amount of data to produce triggers in a block")

parser.add_argument('--snr-threshold', type=float,
                    help='SNR threshold for generating a trigger')
parser.add_argument('--snr-abort-threshold', type=float)

parser.add_argument('--channel-name', action=MultiDetMultiColonOptionAction,
                    required=True)
parser.add_argument('--state-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel containing frame status information. Used "
                         "to determine when to analyze the hoft data. This somewhat"
                         " corresponds to CAT1 information")
parser.add_argument('--analyze-flags', action=MultiDetOptionAction, nargs='+',
                    help='The flags that must be in the "good" state to analyze data')
parser.add_argument('--idq-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel to read iDQ timeseries from. iDQ timeseries are "
                         "required by certain ranking statistics.")
parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction,
                    help="Channel containing information about the state of idq."
                         "Used to determine when idq is usable for statistics. ")
parser.add_argument('--idq-threshold', type=float,
                    help='Threshold used to veto triggers at times of '
                    'low iDQ False Alarm Probability')
parser.add_argument('--data-quality-channel',
                    action=MultiDetMultiColonOptionAction,
                    help="Channel containing data quality information. Used "
                         "to determine when hoft may be suspect and may be used to "
                         "veto triggers or not analyze a segment of data. This "
                         "roughly corresponds to CAT2 information")
parser.add_argument('--data-quality-flags', action=MultiDetOptionAction, nargs='+',
                    help='Flags used to determine when to throw triggers away. '
                         'For each detector, give a comma-separated list of flags.')
parser.add_argument('--data-quality-padding', type=float, default=0,
                    help='Time in seconds around a bad dq time to additionally remove '
                         'triggers')
parser.add_argument('--frame-src', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-type', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--force-update-cache', action='store_true')
parser.add_argument('--highpass-frequency', type=float,
                    help="Frequency to apply highpass filtering")
parser.add_argument('--highpass-reduction', type=float,
                    help="DB to reduce low frequencies")
parser.add_argument('--highpass-bandwidth', type=float,
                    help="Width of the highpass turnover region in Hz")
parser.add_argument('--psd-recalculate-difference', type=float, default=.01)
parser.add_argument('--psd-abort-difference', type=float, default=.20)
parser.add_argument('--psd-samples', type=int, required=True,
                    help="Number of PSD segments to use in the rolling estimate")
parser.add_argument('--psd-segment-length', type=int, required=True,
                    help="Length in seconds of each PSD segment")
parser.add_argument('--psd-inverse-length', type=float,
                    help="Length in time for the equivalent FIR filter")
parser.add_argument('--psd-recompute-length', type=positive_int, default=1,
                    help="If given only recompute the PSD after this number of "
                         "analysis chunks, whose length is set by the option "
                         "--analysis-chunk.")
parser.add_argument('--trim-padding', type=float, default=0.25,
                    help="Padding around the overwhitened analysis block")
parser.add_argument("--enable-bank-start-frequency", action='store_true',
                    help="Read the starting frequency of template waveforms"
                         " from the template bank")

parser.add_argument('--autogating-threshold', type=float, metavar='SIGMA',
                    help='If given, find and gate glitches '
                         'producing a deviation larger than '
                         'SIGMA in the whitened strain time '
                         'series.')
parser.add_argument('--autogating-pad', type=float, default=0.5,
                    metavar='SECONDS',
                    help='Ignore the given length of whitened '
                         'strain at the ends of a segment, to '
                         'avoid filters ringing.')
parser.add_argument('--autogating-cluster', type=float, default=1,
                    metavar='SECONDS',
                    help='Length of clustering window for '
                         'detecting glitches for autogating.')
parser.add_argument('--autogating-width', type=float, default=0.25,
                    metavar='SECONDS', help='Half-width of the gating window.')
parser.add_argument('--autogating-taper', type=float, metavar='SECONDS',
                    default=0.25,
                    help='Taper the strain before and after '
                         'each gating window over a duration '
                         'of SECONDS.')

parser.add_argument('--sync', action='store_true',
                    help="Imposes an MPI synchronization at each transfer of"
                         " single-detector triggers. Can help with debugging"
                         " and avoiding memory issues when running offline"
                         " analyses.")
parser.add_argument('--increment-update-cache', action=MultiDetOptionAction, nargs='+')
parser.add_argument('--frame-read-timeout', type=float, default=30)
parser.add_argument('--increment', type=int, default=8, metavar="T",
                    help="When generating the template waveforms, their"
                         " durations are binned such that T is their greatest"
                         " common divisor.")

parser.add_argument('--start-time', type=int, default=None,
                    help='Start the analysis at the given GPS time')
parser.add_argument('--end-time', type=int, default=numpy.inf,
                    help='Stop the analysis at the given GPS time')

parser.add_argument('--output-path', required=True,
                    help='Path to a directory to store results in')
parser.add_argument('--output-status', type=str, metavar='PATH',
                    help='If given, PyCBC Live will periodically write JSON '
                         'status info to PATH, so the analysis can be '
                         'monitored via Nagios')
parser.add_argument('--day-hour-output-prefix', action='store_true')
parser.add_argument('--store-psd', action='store_true')
parser.add_argument('--output-background', type=str, nargs='+',
                    help='Takes a period in seconds and a file path and dumps '
                         'the coinc backgrounds to that path with that period')
parser.add_argument('--output-background-n-loudest', type=int, default=10000,
                    help="If given an integer (assumed positive), it stores loudest n triggers"
                    "(not sorted) for each of the coinc background. If 0, all bkg will be dumped.")

parser.add_argument('--newsnr-threshold', type=float, default=0)
parser.add_argument('--max-batch-size', type=int, default=2**27)
parser.add_argument('--store-loudest-index', type=int, default=0)
parser.add_argument('--max-psd-abort-distance', type=float, default=numpy.inf,
                    help="Safety BNS horizon distance (in Mpc) above which a "
                    "detector's data is discarded.")
parser.add_argument('--min-psd-abort-distance', type=float, default=-numpy.inf,
                    help="Safety BNS horizon distance (in Mpc) below which a "
                    "detector's data is discarded.")
parser.add_argument('--max-triggers-in-batch', type=int, metavar="N",
                    help="Tells each matched-filtering process to only report "
                         "the loudest N single-detector triggers by SNR.")
parser.add_argument('--max-length', type=float,
                    help='Maximum duration of templates, used to set the data buffer size')

parser.add_argument('--enable-profiling', type=int, metavar='RANK',
                    help='Dump out profiling information from the MPI process'
                         ' with given rank at the end of program execution')

parser.add_argument('--enable-background-estimation', default=False, action='store_true')
parser.add_argument('--ifar-double-followup-threshold', type=float, required=True,
                    help='Inverse-FAR threshold to followup double coincs with'
                         'additional detectors')
parser.add_argument('--pvalue-combination-livetime', type=float, required=True,
                    help="Livetime used for p-value combination with followup "
                         "detectors, in years")
parser.add_argument('--enable-single-detector-background', action='store_true', default=False)

parser.add_argument('--enable-gracedb-upload', action='store_true', default=False,
                    help='Upload triggers to GraceDB')
parser.add_argument('--enable-production-gracedb-upload', action='store_true', default=False,
                    help='Do not mark triggers uploaded to GraceDB as test '
                         'events. This option should *only* be enabled in '
                         'production analyses!')
parser.add_argument('--enable-single-detector-upload', action='store_true', default=False,
                    help='Upload single ifo events to GraceDB')
parser.add_argument('--gracedb-server', metavar='URL',
                    help='URL of GraceDB server API for uploading events. '
                         'If not provided, the default URL is used.')
parser.add_argument('--gracedb-search', type=str, default='AllSky',
                    help='String going into the "search" field of the GraceDB '
                         'events')
parser.add_argument('--ifar-upload-threshold', type=float, required=True,
                    help='Inverse-FAR threshold for uploading candidate '
                         'triggers to GraceDB, in years.')
parser.add_argument('--file-prefix', default='Live')

parser.add_argument('--round-start-time', type=int, metavar='X',
                    help="Round up the start time to the nearest multiple of X"
                         " seconds. This is useful for forcing agreement "
                         " with frame file production.")
parser.add_argument('--size-override', type=int, metavar='N',
                    help="Override the internal MPI size layout. "
                         " Useful for debugging and running a portion of a bank")
parser.add_argument('--fftw-planning-limit', type=float,
                    help="Time in seconds to allow for a plan to be created")
parser.add_argument('--run-snr-optimization', action='store_true',
                    default=False,
                    help='Run spawned followup processes to maximize SNR for '
                         'any trigger uploaded to GraceDB')
parser.add_argument('--snr-opt-timeout', type=int, default=400, metavar='SECONDS',
                    help='Maximum allowed duration of followup process to maximize SNR')


scheme.insert_processing_option_group(parser)
LiveSingle.insert_args(parser)
fft.insert_fft_option_group(parser)
Coincer.insert_args(parser)
SingleDetSGChisq.insert_option_group(parser)
mchirp_area.insert_args(parser)
livepau.insert_live_pastro_option_group(parser)

args = parser.parse_args()
scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)

if args.output_background is not None and len(args.output_background) != 2:
    parser.error('--output-background takes two parameters: period and path')
if not args.enable_gracedb_upload and args.enable_single_detector_upload:
    parser.error('You are not allowed to enable single ifo upload without the '
                 '--enable-gracedb-upload option!')

log_format = '%(asctime)s {} {} %(message)s'.format(platform.node(),
                                                    mpi.COMM_WORLD.Get_rank())
pycbc.init_logging(args.verbose, format=log_format)

ctx = scheme.from_cli(args)
fft.from_cli(args)

# Approximant guess of the total padding
valid_pad = args.analysis_chunk
total_pad = args.trim_padding * 2 + valid_pad
lfc = None if args.enable_bank_start_frequency else args.low_frequency_cutoff
bank = waveform.LiveFilterBank(
        args.bank_file, args.sample_rate, total_pad, low_frequency_cutoff=lfc,
        approximant=args.approximant, increment=args.increment)
if bank.min_f_lower < args.low_frequency_cutoff:
    parser.error('--low-frequency-cutoff ({} Hz) must not be larger than the '
                 'minimum f_lower across all templates '
                 '({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower))

ifos = set(args.channel_name.keys())
logging.info('Analyzing data from detectors %s', ppdets(ifos))

evnt = LiveEventManager(args, bank)

# include MPI rank and functional description into proctitle
task_name = 'root' if evnt.rank == 0 else 'filtering'
setproctitle('PyCBC Live rank {:d} [{}]'.format(evnt.rank, task_name))

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

if args.size_override:
    evnt.size = args.size_override

# make sure we can talk to GraceDB
if evnt.rank == 0 and args.enable_gracedb_upload:
    evnt.create_gdb()
    response = evnt.gracedb.ping()
    logging.info('GraceDB ping response: %s %s time elapsed: %s',
            response.status, response.reason, response.elapsed.total_seconds())

# I'm not the root, so do some actual filtering.
with ctx:
    try:
        # Import system wisdom.
        if args.fftw_import_system_wisdom:
            fft.fftw.import_sys_wisdom()

        # Read specified user-provided wisdom files
        if args.fftw_input_float_wisdom_file is not None:
            fft.fftw.import_single_wisdom_from_filename(args.fftw_input_float_wisdom_file)

        if args.fftw_input_double_wisdom_file is not None:
            fft.fftw.import_double_wisdom_from_filename(args.fftw_input_double_wisdom_file)

        if args.fftw_planning_limit:
            fft.fftw.set_planning_limit(args.fftw_planning_limit)
    except (ValueError, RuntimeError) as e:
        print(e)
        exit()

    maxlen = args.psd_segment_length * (args.psd_samples // 2 + 1)
    if evnt.rank > 0:
        bank.table.sort(order='mchirp')
        waveforms = list(bank[evnt.rank-1::evnt.size-1])
        lengths = numpy.array([1.0 / wf.delta_f for wf in waveforms])
        psd_len = args.psd_segment_length * (args.psd_samples // 2 + 1)
        maxlen = max(lengths.max(), psd_len)
        mf = LiveBatchMatchedFilter(waveforms, args.snr_threshold,
                                    args.chisq_bins, sg_chisq,
                                    snr_abort_threshold=args.snr_abort_threshold,
                                    newsnr_threshold=args.newsnr_threshold,
                                    max_triggers_in_batch=args.max_triggers_in_batch,
                                    maxelements=args.max_batch_size)

    # Synchronize start time if not provided on the command line
    if not args.start_time:
        evnt.barrier()
        tnow = int(pycbc.gps_now()) if evnt.rank == 0 else None
        args.start_time = evnt.comm.bcast(tnow, root=0)

    if args.round_start_time:
        args.start_time = int(args.start_time / args.round_start_time + 1) *\
            args.round_start_time
        logging.info('Starting from: %s', args.start_time)

    # initialize the data readers for all detectors
    if args.max_length is not None:
        maxlen = args.max_length
    maxlen = int(maxlen)
    data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen)
                   for ifo in ifos}
    evnt.data_readers = data_reader

    # create single-detector background "estimators"
    if args.enable_single_detector_background and evnt.rank == 0:
        sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
                          for ifo in ifos}

    # Create double coincident background estimator for every combo
    if args.enable_background_estimation and evnt.rank == 0:
        ifo_combos = itertools.combinations(ifos, 2)
        estimators = []
        for combo in ifo_combos:
            logging.info('Will calculate %s background', ppdets(combo, "-"))
            estimators.append(Coincer.from_cli(
                args, len(bank), args.analysis_chunk, list(combo)
            ))

        my_coinc_id = 999999
        def set_coinc_id(i):
            global my_coinc_id
            my_coinc_id = i
            c = estimators[my_coinc_id]
            setproctitle('PyCBC Live {} bg estimator'.format(
                    ppdets(c.ifos, '-')))

        def get_coinc(results):
            c = estimators[my_coinc_id]
            r = c.add_singles(results)
            logging.info('Coincs %i: %s-%s: %s in cbuffer', my_coinc_id,
                         c.ifos[0], c.ifos[1], c.coincs.index)
            return r

        def output_background(_):
            estim = estimators[my_coinc_id]
            bg_time = estim.background_time / lal.YRJUL_SI
            return estim.ifos, estim.coincs.data, bg_time

        coinc_pool = BroadcastPool(len(estimators))
        coinc_pool.allmap(set_coinc_id, range(len(estimators)))

    logging.info('Starting')

    if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
        pr = cProfile.Profile()
        pr.enable()

    # main analysis loop
    data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
    last_bg_dump_time = int(data_end())
    psd_count = {ifo:0 for ifo in ifos}

    while data_end() < args.end_time:
        t1 = pycbc.gps_now()
        logging.info('Analyzing from %s', data_end())

        results = {}
        evnt.live_detectors = set()

        for ifo in ifos:
            results[ifo] = False
            status = data_reader[ifo].advance(
                valid_pad,
                timeout=args.frame_read_timeout
            )
            if status and psd_count[ifo] == 0:
                status = data_reader[ifo].recalculate_psd()
                psd_count[ifo] = args.psd_recompute_length - 1
            elif not status:
                psd_count[ifo] = 0
            else:
                psd_count[ifo] -= 1

            if data_reader[ifo].psd is not None:
                dist = data_reader[ifo].psd.dist
                if dist < args.min_psd_abort_distance or dist > args.max_psd_abort_distance:
                    logging.info("%s PSD dist %s outside acceptable range [%s, %s]",
                                 ifo, dist, args.min_psd_abort_distance,
                                 args.max_psd_abort_distance)
                    status = False

            if status is True:
                evnt.live_detectors.add(ifo)
                if evnt.rank > 0:
                    logging.info('Filtering %s', ifo)
                    results[ifo] = mf.process_data(data_reader[ifo])
            else:
                logging.info('Insufficient data for %s analysis', ifo)

        if evnt.rank > 0:
            evnt.commit_results((results, data_end()))
        else:
            psds = {ifo: data_reader[ifo].psd for ifo in data_reader
                         if data_reader[ifo].psd is not None}

            # Collect together the single detector triggers
            if evnt.size > 1:
                results, valid_end = evnt.gather_results()

            # veto detectors with different state between the root
            # and worker nodes (e.g. late frame files on one node only)
            detectors_with_results = set(results.keys())
            evnt.live_detectors.intersection_update(detectors_with_results)
            for ifo in detectors_with_results - evnt.live_detectors:
                results.pop(ifo)

            # Veto single detector triggers that fail the DQ vector
            # or have iDQ value below the threshold
            for ifo in results:
                if data_reader[ifo].dq is not None:
                    logging.info("Checking %s's DQ vector", ifo)
                    start = data_reader[ifo].start_time
                    times = results[ifo]['end_time']
                    idx = data_reader[ifo].dq.indices_of_flag(
                            start, valid_pad, times,
                            padding=data_reader[ifo].dq_padding)
                    logging.info('Keeping %d/%d %s triggers after DQ flags',
                                 len(idx), len(times), ifo)
                    for key in results[ifo]:
                        if len(results[ifo][key]):
                            results[ifo][key] = results[ifo][key][idx]
                if data_reader[ifo].idq is not None:
                    logging.info("Checking %s's iDQ information", ifo)
                    start = data_reader[ifo].start_time
                    times = results[ifo]['end_time']
                    idx = data_reader[ifo].idq.indices_of_flag(
                            start, valid_pad, times,
                            padding=data_reader[ifo].dq_padding)
                    logging.info('Keeping %d/%d %s triggers after iDQ',
                                 len(idx), len(times), ifo)
                    for key in results[ifo]:
                        if len(results[ifo][key]):
                            results[ifo][key] = results[ifo][key][idx]

            # Look for coincident triggers and do background estimation
            if args.enable_background_estimation:
                coinc_results = coinc_pool.broadcast(get_coinc, results)

                # Pick the best coinc in this chunk
                best_coinc = Coincer.pick_best_coinc(coinc_results)

                evnt.check_coincs(list(results.keys()), best_coinc, psds)

            # Check for singles
            if args.enable_single_detector_background:
                evnt.check_singles(results, psds)

            gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader}

            # map the results file to an hdf file
            prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)),
                                          args.file_prefix,
                                          data_end() - args.analysis_chunk,
                                          valid_pad)

            evnt.dump(results, prefix, time_index=data_end(),
                      store_psd=(psds if args.store_psd else False),
                      store_loudest_index=args.store_loudest_index,
                      raw_results=best_coinc, gates=gates)

            # dump the background if needed
            if args.output_background and \
                    data_end() - last_bg_dump_time > float(args.output_background[0]):
                last_bg_dump_time = int(data_end())
                bg_dists = coinc_pool.broadcast(output_background, None)
                bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)),
                                                           last_bg_dump_time)
                bg_fn = os.path.join(args.output_background[1], bg_fn)
                with h5py.File(bg_fn, 'w') as bgf:
                    for bg_ifos, bg_data, bg_time in bg_dists:
                        if args.output_background_n_loudest and \
                                args.output_background_n_loudest < (len(bg_data) - 1):
                            n_loudest = args.output_background_n_loudest
                            assert (n_loudest > 0), \
                                "We can only store positive int loudest triggers."
                            ds = bgf.create_dataset(
                                ','.join(sorted(bg_ifos)),
                                data=-numpy.partition(-bg_data, n_loudest)[:n_loudest],
                                compression='gzip'
                            )
                        else:
                            ds = bgf.create_dataset(
                                ','.join(sorted(bg_ifos)),
                                data=bg_data,
                                compression='gzip'
                            )
                        ds.attrs['background_time'] = bg_time
                    bgf.attrs['gps_time'] = last_bg_dump_time

            logging.info('Finished analyzing up to %s', data_end())

        if args.sync:
            evnt.barrier()
        tdiff = pycbc.gps_now() - t1
        lag = float(pycbc.gps_now() - data_end())
        logging.info('Took %1.2f, duty factor of %.2f, lag %.2f s, %d live detectors',
                     tdiff, tdiff / valid_pad, lag, len(evnt.live_detectors)
        )

        if args.output_status is not None and evnt.rank == 0:
            if lag > 120:
                status_intervals = [{'num_status': 2,
                                     'txt_status': 'CRITICAL: lag greater than 2 min',
                                     'start_sec': 0}]
            else:
                status_intervals = [{'num_status': 0,
                                     'txt_status': 'OK: No reported problems',
                                     'start_sec': 0},
                                    {'num_status': 1,
                                     'txt_status': 'WARNING: last report between 2 and 4 min ago',
                                     'start_sec': 120}]
            status_intervals.append({'num_status': 2,
                                     'txt_status': 'CRITICAL: last report more than 4 min ago',
                                     'start_sec': 240})
            status = {'author': 'Tito Dal Canton',
                      'email': 'tito.canton@ligo.org',
                      'created_gps': int(pycbc.gps_now()),
                      'status_intervals': status_intervals}
            try:
                with open(args.output_status, 'w') as status_fp:
                    json.dump(status, status_fp)
            except IOError:
                logging.error('I/O error writing status JSON file!')

if evnt.rank == 1:
    if args.fftw_output_float_wisdom_file:
        fft.fftw.export_single_wisdom_to_filename(args.fftw_output_float_wisdom_file)

    if args.fftw_output_double_wisdom_file:
        fft.fftw.export_double_wisdom_to_filename(args.fftw_output_double_wisdom_file)

if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
    pr.dump_stats(f'profiling_rank_{evnt.rank:03d}')
back to top