https://github.com/gwastro/pycbc
Tip revision: d2c91525c6b3660773d112cc2de2e8a3ed3b55d0 authored by Duncan Brown on 29 September 2015, 01:13:42 UTC
Set release version to 1.2.0
Set release version to 1.2.0
Tip revision: d2c9152
inject.py
# Copyright (C) 2012 Alex Nitz, Tito Dal Canton
#
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""This module provides utilities for injecting signals into data
"""
import numpy as np
import lal
import lalsimulation as sim
from pycbc.waveform import get_td_waveform, utils as wfutils
from glue.ligolw import utils as ligolw_utils
from glue.ligolw import ligolw, table, lsctables
from pycbc.types import float64, float32, TimeSeries
from pycbc.detector import Detector
injection_func_map = {
np.dtype(float32): sim.SimAddInjectionREAL4TimeSeries,
np.dtype(float64): sim.SimAddInjectionREAL8TimeSeries
}
# dummy class needed for loading LIGOLW files
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
def legacy_approximant_name(apx):
""" Convert the old style xml approximant name to a name
and phase_order. Alex: I hate this function. Please delet this when we
use Collin's new tables.
"""
apx = str(apx)
try:
order = sim.GetOrderFromString(apx)
except:
print("Warning: Could not read phase order from string, using default")
order = -1
name = sim.GetStringFromApproximant(sim.GetApproximantFromString(apx))
return name, order
class InjectionSet(object):
"""Manages sets of injections: reads injections from LIGOLW XML files
and injects them into time series.
Parameters
----------
sim_file : string
Path to a LIGOLW XML file containing a SimInspiralTable
with injection definitions.
Attributes
----------
indoc
table
"""
def __init__(self, sim_file, **kwds):
self.indoc = ligolw_utils.load_filename(
sim_file, False, contenthandler=LIGOLWContentHandler)
self.table = table.get_table(
self.indoc, lsctables.SimInspiralTable.tableName)
self.extra_args = kwds
def getswigrow(self, glue_row):
"""Translates glue row from the table to libmetaio row"""
import lalmetaio as lmt
swigrow = lmt.SimInspiralTable()
for simattr in lsctables.SimInspiralTable.validcolumns.keys():
if simattr in ["waveform", "source", "numrel_data", "taper"]:
setattr( swigrow, simattr, str(getattr(glue_row, simattr)) )
else:
setattr( swigrow, simattr, getattr(glue_row, simattr) )
swigrow.geocent_end_time.gpsNanoSeconds = glue_row.geocent_end_time_ns
return swigrow
def apply(self, strain, detector_name, f_lower=None, distance_scale=1,
simulation_ids=None):
"""Add injections (as seen by a particular detector) to a time series.
Parameters
----------
strain : TimeSeries
Time series to inject signals into, of type float32 or float64.
detector_name : string
Name of the detector used for projecting injections.
f_lower : {None, float}, optional
Low-frequency cutoff for injected signals. If None, use value
provided by each injection.
distance_scale: {1, float}, optional
Factor to scale the distance of an injection with. The default is
no scaling.
simulation_ids: iterable, optional
If given, only inject signals with the given simulation IDs.
Returns
-------
None
Raises
------
TypeError
For invalid types of `strain`.
"""
if not strain.dtype in (float32, float64):
raise TypeError("Strain dtype must be float32 or float64, not " \
+ str(strain.dtype))
lalstrain = strain.lal()
detector = Detector(detector_name)
earth_travel_time = lal.REARTH_SI / lal.C_SI
t0 = float(strain.start_time) - earth_travel_time
t1 = float(strain.end_time) + earth_travel_time
# pick lalsimulation injection function
add_injection = injection_func_map[strain.dtype]
injections = self.table
if simulation_ids:
injections = [inj for inj in injections \
if inj.simulation_id in simulation_ids]
for inj in injections:
if f_lower is None:
f_l = inj.f_lower
else:
f_l = f_lower
if inj.numrel_data != None and inj.numrel_data != "":
# performing NR waveform injection
# reading Hp and Hc from the frame files
swigrow = self.getswigrow(inj)
import lalinspiral
Hp, Hc = lalinspiral.NRInjectionFromSimInspiral(swigrow,
strain.delta_t)
# converting to pycbc timeseries
hp = TimeSeries(Hp.data.data[:], delta_t=Hp.deltaT,
epoch=Hp.epoch)
hc = TimeSeries(Hc.data.data[:], delta_t=Hc.deltaT,
epoch=Hc.epoch)
hp /= distance_scale
hc /= distance_scale
end_time = float(hp.get_end_time())
start_time = float(hp.get_start_time())
if end_time < t0 or start_time > t1:
continue
else:
# roughly estimate if the injection may overlap with the segment
end_time = inj.get_time_geocent()
inj_length = sim.SimInspiralTaylorLength(
strain.delta_t, inj.mass1 * lal.MSUN_SI,
inj.mass2 * lal.MSUN_SI, f_l, 0)
start_time = end_time - 2 * inj_length
if end_time < t0 or start_time > t1:
continue
name, phase_order = legacy_approximant_name(inj.waveform)
# compute the waveform time series
hp, hc = get_td_waveform(
inj, approximant=name, delta_t=strain.delta_t,
phase_order=phase_order,
f_lower=f_l, distance=inj.distance * distance_scale,
**self.extra_args)
hp._epoch += float(end_time)
hc._epoch += float(end_time)
if float(hp.start_time) > t1:
continue
# taper the polarizations
hp_tapered = wfutils.taper_timeseries(hp, inj.taper)
hc_tapered = wfutils.taper_timeseries(hc, inj.taper)
# compute the detector response and add it to the strain
signal = detector.project_wave(hp_tapered, hc_tapered,
inj.longitude, inj.latitude, inj.polarization)
signal = signal.astype(strain.dtype)
signal_lal = signal.lal()
add_injection(lalstrain, signal_lal, None)
strain.data[:] = lalstrain.data.data[:]
def end_times(self):
""" Return the end times of all injections
"""
return [inj.get_time_geocent() for inj in self.table]
def maximum_snrs(self, psd):
""" Calculate the maximum SNR (without noise), for each signal in the
injection set, using the given PSD.
Parameters
----------
Returns
-------
None
"""
raise NotImplementedError("This is not yet implemented")
def expected_snrs(self, template, psd):
""" Calculate the expected SNR (without noise), for each signal in the
injection set, using the given PSD, when matched with the given template.
Parameters
----------
Returns
-------
None
"""
raise NotImplementedError("This is not yet implemented")
class SGBurstInjectionSet(object):
"""Manages sets of sine-Gaussian burst injections: reads injections
from LIGOLW XML files and injects them into time series.
Parameters
----------
sim_file : string
Path to a LIGOLW XML file containing a SimBurstTable
with injection definitions.
Attributes
----------
indoc
table
"""
def __init__(self, sim_file, **kwds):
self.indoc = ligolw_utils.load_filename(
sim_file, False, contenthandler=LIGOLWContentHandler)
self.table = table.get_table(
self.indoc, lsctables.SimBurstTable.tableName)
self.extra_args = kwds
def getswigrow(self, glue_row):
"""Translates glue row from the table to libmetaio row"""
import lalmetaio as lmt
swigrow = lmt.SimBurstTable()
for simattr in lsctables.SimBurstTable.validcolumns.keys():
if simattr in ["waveform"]:
setattr( swigrow, simattr, str(getattr(glue_row, simattr)) )
else:
setattr( swigrow, simattr, getattr(glue_row, simattr) )
swigrow.geocent_end_time.gpsNanoSeconds = glue_row.geocent_end_time_ns
return swigrow
def apply(self, strain, detector_name, f_lower=None, distance_scale=1):
"""Add injections (as seen by a particular detector) to a time series.
Parameters
----------
strain : TimeSeries
Time series to inject signals into, of type float32 or float64.
detector_name : string
Name of the detector used for projecting injections.
f_lower : {None, float}, optional
Low-frequency cutoff for injected signals. If None, use value
provided by each injection.
distance_scale: {1, foat}, optional
Factor to scale the distance of an injection with. The default is
no scaling.
Returns
-------
None
Raises
------
TypeError
For invalid types of `strain`.
"""
if not strain.dtype in (float32, float64):
raise TypeError("Strain dtype must be float32 or float64, not " \
+ str(strain.dtype))
lalstrain = strain.lal()
detector = Detector(detector_name)
earth_travel_time = lal.REARTH_SI / lal.C_SI
t0 = float(strain.start_time) - earth_travel_time
t1 = float(strain.end_time) + earth_travel_time
# pick lalsimulation tapering function
taper = taper_func_map[strain.dtype]
# pick lalsimulation injection function
add_injection = injection_func_map[strain.dtype]
for inj in self.table:
# roughly estimate if the injection may overlap with the segment
end_time = inj.get_time_geocent()
#CHECK: This is a hack (10.0s); replace with an accurate estimate
inj_length = 10.0
eccentricity = 0.0
polarization = 0.0
start_time = end_time - 2 * inj_length
if end_time < t0 or start_time > t1:
continue
# compute the waveform time series
hp, hc = sim.SimBurstSineGaussian(float(inj.q),
float(inj.frequency),float(inj.hrss),float(eccentricity),
float(polarization),float(strain.delta_t))
hp = TimeSeries(hp.data.data[:], delta_t=hp.deltaT, epoch=hp.epoch)
hc = TimeSeries(hc.data.data[:], delta_t=hc.deltaT, epoch=hc.epoch)
hp._epoch += float(end_time)
hc._epoch += float(end_time)
if float(hp.start_time) > t1:
continue
# compute the detector response, taper it if requested
# and add it to the strain
#signal = detector.project_wave(
# hp, hc, inj.longitude, inj.latitude, inj.polarization)
signal_lal = hp.astype(strain.dtype).lal()
if taper_map['TAPER_NONE'] is not None:
taper(signal_lal.data, taper_map['TAPER_NONE'])
add_injection(lalstrain, signal_lal, None)
strain.data[:] = lalstrain.data.data[:]
def maximum_snrs(self, psd):
""" Calculate the maximum SNR (without noise), for each signal in the
injection set, using the given PSD.
Parameters
----------
Returns
-------
None
"""
raise NotImplementedError("This is not yet implemented")
def expected_snrs(self, template, psd):
""" Calculate the expected SNR (without noise), for each signal in the
injection set, using the given PSD, when matched with the given template.
Parameters
----------
Returns
-------
None
"""
raise NotImplementedError("This is not yet implemented")