Revision 2ac88b212928958f4eb9184e0284f01ada77ef1b authored by Collin Capano on 31 October 2017, 08:03:33 UTC, committed by GitHub on 31 October 2017, 08:03:33 UTC
* give meaningful error when priors section missing * comment out unused variables in example ini file * fix update for travis
1 parent e3b8d4a
test_tmpltbank.py
# Copyright (C) 2013 Ian Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
These are the unittests for the pycbc.tmpltbank module
"""
from __future__ import division
import os
import numpy
import pycbc.tmpltbank
import pycbc.psd
import pycbc.pnutils
from pycbc import pnutils
from pycbc.types import Array
import difflib
import sys
import matplotlib
matplotlib.use('Agg')
import pylab
import unittest
from utils import parse_args_cpu_only, simple_exit
# This will return whatever is appropriate, depending on whether this
# particular instance of the unittest was called for CPU, CUDA, or OpenCL
parse_args_cpu_only("Template bank module")
import argparse
parser = argparse.ArgumentParser()
def update_mass_parameters(tmpltbank_class):
"""
Choose various sets of mass parameters for testing.
"""
num_comp_masses = 3
min_mass1 = [1,2,6]
max_mass1 = [5,8,12]
min_mass2 = [1,1,1]
max_mass2 = [5,5,5]
num_tot_masses = 3
# These *must* be provided
min_tot_mass = [None, 2.5, 3.5]
max_tot_mass = [None, 11, 7.5]
num_chirp_masses = 3
max_chirp_mass = [None, 2.43, 3.5]
min_chirp_mass = [None, 1.218, 2.43]
num_etas = 3
max_eta = [0.25, 0.24, 0.23]
min_eta = [None, 0.16, 0.17]
max_iter_idx = num_comp_masses * num_tot_masses *\
num_chirp_masses * num_etas
for idx in xrange(max_iter_idx):
comp_masses_idx = idx % num_comp_masses
tmpltbank_class.min_mass1 = min_mass1[comp_masses_idx]
tmpltbank_class.max_mass1 = max_mass1[comp_masses_idx]
tmpltbank_class.min_mass2 = min_mass2[comp_masses_idx]
tmpltbank_class.max_mass2 = max_mass2[comp_masses_idx]
reduced_idx = idx // num_comp_masses
tot_mass_idx = reduced_idx % num_tot_masses
tmpltbank_class.min_total_mass = min_tot_mass[tot_mass_idx]
tmpltbank_class.max_total_mass = max_tot_mass[tot_mass_idx]
reduced_idx = reduced_idx // num_tot_masses
chirp_mass_idx = reduced_idx % num_chirp_masses
tmpltbank_class.min_chirp_mass = min_chirp_mass[chirp_mass_idx]
tmpltbank_class.max_chirp_mass = max_chirp_mass[chirp_mass_idx]
reduced_idx = reduced_idx // num_chirp_masses
eta_idx = reduced_idx
tmpltbank_class.max_eta = max_eta[eta_idx]
tmpltbank_class.min_eta = min_eta[eta_idx]
yield idx
return
class TmpltbankTestClass(unittest.TestCase):
def setUp(self):
# Where are my data files?
if os.path.isfile('test/data/ZERO_DET_high_P.txt'):
self.dataDir = 'test/data/'
elif os.path.isfile('data/ZERO_DET_high_P.txt'):
self.dataDir = 'data/'
else:
self.assertTrue(False, msg="Cannot find data files!")
self.deltaF = 0.1
self.f_low = 15
self.f_upper = 2000
self.f0 = 70
self.sampleRate = 4096
self.pnOrder = 'threePointFivePN'
self.min_mass1 = 1
self.min_mass2 = 1
self.max_mass1 = 5
self.max_mass2 = 5
self.max_ns_spin_mag = 0.5
self.max_bh_spin_mag = 0.9
self.ns_bh_boundary_mass = 2.0
self.min_total_mass = 2.5
self.max_total_mass = 6.0
self.max_chirp_mass = 2.4375415772291475
self.min_chirp_mass = 1.2187707886145738
self.max_eta = 0.24
self.min_eta = 0.16
# Sanity check these
pycbc.tmpltbank.verify_mass_range_options(self, parser=parser)
# Need to use F2 metric for ethinca
self.ethincaOrder = 'threePointFivePN'
self.ethincaCutoff = 'SchwarzISCO'
self.ethincaFreqStep = 10.
self.segLen = 1./self.deltaF
self.psdSize = int(self.segLen * self.sampleRate / 2.) + 1
self.psd = pycbc.psd.from_txt('%sZERO_DET_high_P.txt' %(self.dataDir),\
self.psdSize, self.deltaF, self.f_low, is_asd_file=True)
metricParams = pycbc.tmpltbank.metricParameters(self.pnOrder,\
self.f_low, self.f_upper, self.deltaF, self.f0)
metricParams.psd = self.psd
massRangeParams = pycbc.tmpltbank.massRangeParameters(self.min_mass1,\
self.max_mass1, self.min_mass2, self.max_mass2,\
maxNSSpinMag=self.max_ns_spin_mag,\
maxBHSpinMag=self.max_bh_spin_mag,\
maxTotMass=self.max_total_mass,\
minTotMass=self.min_total_mass,\
max_chirp_mass=self.max_chirp_mass,\
min_chirp_mass=self.min_chirp_mass,\
maxEta=self.max_eta,\
minEta=self.min_eta,\
ns_bh_boundary_mass=self.ns_bh_boundary_mass)
# And again with the nsbh flag
massRangeParams2 = pycbc.tmpltbank.massRangeParameters(self.min_mass1,\
self.max_mass1, self.min_mass2, self.max_mass2,\
maxNSSpinMag=self.max_ns_spin_mag,\
maxBHSpinMag=self.max_bh_spin_mag,\
maxTotMass=self.max_total_mass,\
minTotMass=self.min_total_mass,\
max_chirp_mass=self.max_chirp_mass,\
min_chirp_mass=self.min_chirp_mass,\
maxEta=self.max_eta,\
minEta=self.min_eta,\
nsbhFlag=True)
metricParams = pycbc.tmpltbank.determine_eigen_directions(metricParams)
vals=pycbc.tmpltbank.estimate_mass_range(100000, massRangeParams,\
metricParams, self.f_upper, covary=False)
cov = numpy.cov(vals)
_,self.evecsCV = numpy.linalg.eig(cov)
metricParams.evecsCV = {}
metricParams.evecsCV[self.f_upper] = self.evecsCV
vals=pycbc.tmpltbank.estimate_mass_range(100000, massRangeParams,\
metricParams, self.f_upper, covary=False)
self.metricParams = metricParams
self.massRangeParams = massRangeParams
self.massRangeParams2 = massRangeParams2
self.ethincaParams = pycbc.tmpltbank.ethincaParameters(
self.ethincaOrder, self.ethincaCutoff, self.ethincaFreqStep,
full_ethinca=False, time_ethinca=False)
self.xis = vals
def test_eigen_directions(self):
evalsStock = Array(numpy.loadtxt('%sstockEvals.dat'%(self.dataDir)))
evecsStock = Array(numpy.loadtxt('%sstockEvecs.dat'%(self.dataDir)))
maxEval = max(evalsStock)
evalsCurr = Array(self.metricParams.evals[self.f_upper])
evecsCurr = Array(self.metricParams.evecs[self.f_upper])
errMsg = "pycbc.tmpltbank.determine_eigen_directions has failed "
errMsg += "sanity check."
evalsDiff = abs(evalsCurr - evalsStock)/maxEval
self.assertTrue(not (evalsDiff > 1E-5).any(), msg=errMsg)
for stock,test in zip(evecsStock.data,evecsCurr.data):
stockScaled = stock * evalsCurr.data**0.5
testScaled = test * evalsCurr.data**0.5
diff = stockScaled - testScaled
self.assertTrue(not (diff > 1E-4).any(), msg=errMsg)
def test_get_random_mass(self):
# Want to do this for a variety of mass combinations
for i in update_mass_parameters(self):
curr_min_mass = self.min_total_mass
curr_max_mass = self.max_total_mass
try:
pycbc.tmpltbank.verify_mass_range_options(self, parser=parser)
except ValueError:
# Some of the inputs are unphysical and will fail.
# These cases are known to fail, the inputs are unphysical
# 35 has inconsistent total mass and eta restrictions
# 38 Component mass, [upper] chirp mass and [lower] eta limits
# rule out the entire space.
# 41 Same as 38
# 44 Same as 38
# 62 From component mass and total mass limits only total masses
# between 7 and 7.5 are possible. This range all has eta
# lower than the limit of 0.17.
# 65 Same as 38
# 68 Same as 38
# 71 Same as 38
# 80 Same as 62
if i in [35,38,41,44,62,65,68,71,80]:
continue
raise
# Check that if the mass limits have changed, it was right to do so
# This is not exhaustive, but gets most things
if not self.min_total_mass == curr_min_mass:
min_comp_mass = self.min_mass1 + self.min_mass2
min_eta = self.min_mass1 * self.min_mass2 /\
(min_comp_mass * min_comp_mass)
min_chirp_mass = min_comp_mass * min_eta**(3./5.)
if self.min_total_mass == min_comp_mass:
# Okay, the total mass is changed by the components
pass
elif min_eta < self.min_eta or min_eta > self.max_eta:
# Okay, not possible from eta
pass
elif min_chirp_mass < self.min_chirp_mass:
# Okay, not possible from chirp mass
pass
else:
err_msg = "Minimum total mass changed unexpectedly."
print(self.min_total_mass, curr_min_mass)
print(self.min_mass1, self.min_mass2, min_comp_mass)
print(min_eta, self.min_eta, self.max_eta)
print(min_chirp_mass, self.min_chirp_mass)
self.fail(err_msg)
if not self.max_total_mass == curr_max_mass:
max_comp_mass = self.max_mass1 + self.max_mass2
max_eta = self.max_mass1 * self.max_mass2 /\
(max_comp_mass * max_comp_mass)
max_chirp_mass = max_comp_mass * max_eta**(3./5.)
if self.max_total_mass == max_comp_mass:
# Okay, the total mass is changed by the components
pass
elif max_eta < self.min_eta or max_eta > self.max_eta:
# Okay, not possible from eta
pass
elif max_chirp_mass > self.max_chirp_mass:
# Okay, not possible from chirp mass
pass
else:
err_msg = "Maximum total mass changed unexpectedly."
self.fail(err_msg)
massRangeParams = pycbc.tmpltbank.massRangeParameters(\
self.min_mass1,\
self.max_mass1, self.min_mass2, self.max_mass2,\
maxNSSpinMag=self.max_ns_spin_mag,\
maxBHSpinMag=self.max_bh_spin_mag,\
maxTotMass=self.max_total_mass,\
minTotMass=self.min_total_mass,\
max_chirp_mass=self.max_chirp_mass,\
min_chirp_mass=self.min_chirp_mass,\
maxEta=self.max_eta,\
minEta=self.min_eta,\
ns_bh_boundary_mass=self.ns_bh_boundary_mass)
# And again with the nsbh flag
massRangeParams2 = pycbc.tmpltbank.massRangeParameters(\
self.min_mass1,\
self.max_mass1, self.min_mass2, self.max_mass2,\
maxNSSpinMag=self.max_ns_spin_mag,\
maxBHSpinMag=self.max_bh_spin_mag,\
maxTotMass=self.max_total_mass,\
minTotMass=self.min_total_mass,\
max_chirp_mass=self.max_chirp_mass,\
min_chirp_mass=self.min_chirp_mass,\
maxEta=self.max_eta,\
minEta=self.min_eta,\
nsbhFlag=True)
mass1, mass2, spin1z, spin2z = \
pycbc.tmpltbank.get_random_mass(100000, massRangeParams)
mass = mass1 + mass2
errMsg = "pycbc.tmpltbank.get_random_mass returns invalid ranges."
self.assertTrue(not (mass < self.min_total_mass).any(),msg=errMsg)
self.assertTrue(not (mass > self.max_total_mass).any(),msg=errMsg)
self.assertTrue(not (mass1 > self.max_mass1 * 1.001).any(),
msg=errMsg)
self.assertTrue(not (mass1 < self.min_mass1 * 0.999).any(),
msg=errMsg)
self.assertTrue(not (mass2 > self.max_mass2 * 1.001).any(),
msg=errMsg)
self.assertTrue(not (mass2 < self.min_mass2 * 0.999).any(),
msg=errMsg)
self.assertTrue(not (mass1 < mass2).any(),msg=errMsg)
# Chirp mass and eta
mchirp, eta = pnutils.mass1_mass2_to_mchirp_eta(mass1,mass2)
if self.max_chirp_mass:
self.assertTrue(not (mchirp > self.max_chirp_mass*1.0001).any(),
msg=errMsg)
if self.min_chirp_mass:
self.assertTrue(not (mchirp < self.min_chirp_mass*0.9999).any(),
msg=errMsg)
if self.min_eta:
self.assertTrue(not (eta < self.min_eta*0.9999).any(),
msg=errMsg)
self.assertTrue(not (eta > self.max_eta*1.0001).any(),
msg=errMsg)
nsSpin1 = spin1z[mass1 < self.ns_bh_boundary_mass]
nsSpin2 = spin2z[mass2 < self.ns_bh_boundary_mass]
bhSpin1 = spin1z[mass1 > self.ns_bh_boundary_mass]
bhSpin2 = spin2z[mass2 > self.ns_bh_boundary_mass]
self.assertTrue(not (abs(nsSpin1) > 0.5).any(), msg=errMsg)
self.assertTrue(not (abs(nsSpin2) > 0.5).any(), msg=errMsg)
self.assertTrue(not (abs(bhSpin1) > 0.9).any(), msg=errMsg)
self.assertTrue(not (abs(bhSpin2) > 0.9).any(), msg=errMsg)
# Check that *some* spins are bigger than 0.5
if len(bhSpin1):
self.assertTrue((abs(bhSpin1) > 0.5).any(), msg=errMsg)
if len(bhSpin2):
self.assertTrue((abs(bhSpin2) > 0.5).any(), msg=errMsg)
# This can be used to test the boundaries are all applied visually
# FIXME: This would need to be updated for all of the mass limits
#pylab.plot(mass1, mass2, 'b.')
#pylab.plot([3.0,5.0],[3.0,1.0],'r-')
#pylab.plot([1.7216566400945545,1.9921146662296347,3.4433132801891091,4.01175560949798],[1.1477710933963694,1.0066274466204264,2.2955421867927388,1.002938902374495],'mx')
#pylab.ylim([0.8,3.0])
#pylab.xlim([1.5,5.0])
#pylab.savefig("masses_%d.png" %(idx,))
# Check nsbh flag
mass1, mass2, spin1z, spin2z = \
pycbc.tmpltbank.get_random_mass(100000, massRangeParams2)
self.assertTrue(not (abs(spin1z) > 0.9).any(), msg=errMsg)
self.assertTrue(not (abs(spin2z) > 0.5).any(), msg=errMsg)
self.assertTrue((abs(spin1z) > 0.5).any(), msg=errMsg)
def test_chirp_params(self):
chirps=pycbc.tmpltbank.get_chirp_params(2.2, 1.8, 0.2, 0.3,
self.metricParams.f0, self.metricParams.pnOrder)
stockChirps = numpy.loadtxt('%sstockChirps.dat'%(self.dataDir))
diff = (chirps - stockChirps) / stockChirps
errMsg = "Calculated chirp params differ from that expected."
self.assertTrue( not (diff > 1E-4).any(), msg=errMsg)
def test_hexagonal_placement(self):
arrz = pycbc.tmpltbank.generate_hexagonal_lattice(10, 0, 10, 0, 0.03)
arrz = numpy.array(arrz)
stockGrid = numpy.loadtxt("%sstockHexagonal.dat"%(self.dataDir))
diff = arrz - stockGrid
errMsg = "Calculated lattice differs from that expected."
self.assertTrue( not (diff > 1E-4).any(), msg=errMsg)
def test_anstar_placement(self):
arrz = pycbc.tmpltbank.generate_anstar_3d_lattice(0, 10, 0, 10, 0, \
10, 0.03)
arrz = numpy.array(arrz)
stockGrid = numpy.loadtxt("%sstockAnstar3D.dat"%(self.dataDir))
numpy.savetxt("new_example.dat", arrz)
errMsg = "Calculated lattice differs from that expected."
self.assertTrue(len(arrz) == len(stockGrid), msg=errMsg)
diff = arrz - stockGrid
self.assertTrue( not (diff > 1E-4).any(), msg=errMsg)
def test_get_mass_distribution(self):
# Just run the function, no checking output
pycbc.tmpltbank.get_mass_distribution([1.35,0.239,0.4,-0.2], 2, \
self.massRangeParams, self.metricParams, \
self.f_upper, \
numJumpPoints=123, chirpMassJumpFac=0.0002, \
etaJumpFac=0.009, spin1zJumpFac=0.1, \
spin2zJumpFac=0.2)
def test_get_phys_cov_masses(self):
evecs = self.metricParams.evecs[self.f_upper]
evals = self.metricParams.evals[self.f_upper]
masses1 = [2.2,1.8,0.4,0.3]
masses2 = [2.21,1.79,0.41,0.29]
xis1 = pycbc.tmpltbank.get_cov_params(masses1[0], masses1[1],
masses1[2], masses1[3], self.metricParams, self.f_upper)
xis2 = pycbc.tmpltbank.get_cov_params(masses2[0], masses2[1],
masses2[2], masses2[3], self.metricParams, self.f_upper)
testXis = [xis1[0],xis1[1]]
b_mtot, b_eta = pnutils.mass1_mass2_to_mtotal_eta(masses2[0],
masses2[1])
bestMasses = [b_mtot, b_eta, masses2[2], masses2[3]]
bestXis = xis2
output = pycbc.tmpltbank.get_physical_covaried_masses(testXis, \
bestMasses, bestXis, 0.0001, self.massRangeParams, \
self.metricParams, self.f_upper)
# Test that returned xis are close enough
diff = (output[6][0] - testXis[0])**2
diff += (output[6][1] - testXis[1])**2
errMsg = 'pycbc.tmpltbank.get_physical_covaried_masses '
errMsg += 'failed to find a point within the desired limits.'
self.assertTrue( diff < 1E-4,msg=errMsg)
# Test that returned masses and xis agree
massT = output[0] + output[1]
etaT = output[0]*output[1] / (massT*massT)
spinSetT = pycbc.pnutils.get_beta_sigma_from_aligned_spins(\
etaT, output[2], output[3])
xisT = pycbc.tmpltbank.get_cov_params(output[0], output[1],
output[2], output[3], self.metricParams, self.f_upper)
errMsg = "Recovered xis do not agree with those expected."
self.assertTrue( abs(xisT[0] - output[6][0]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[1] - output[6][1]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[2] - output[6][2]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[3] - output[6][3]) < 1E-5, msg=errMsg)
# Test again with nsbh flag on
output = pycbc.tmpltbank.get_physical_covaried_masses(testXis, \
bestMasses, bestXis, 0.0001, self.massRangeParams2, \
self.metricParams, self.f_upper)
# Test that returned xis are close enough
diff = (output[6][0] - testXis[0])**2
diff += (output[6][1] - testXis[1])**2
errMsg = 'pycbc.tmpltbank.get_physical_covaried_masses '
errMsg += 'failed to find a point within the desired limits.'
self.assertTrue( diff < 1E-4,msg=errMsg)
# Test that returned masses and xis agree
xisT = pycbc.tmpltbank.get_cov_params(output[0], output[1],
output[2], output[3], self.metricParams, self.f_upper)
errMsg = "Recovered xis do not agree with those expected."
self.assertTrue( abs(xisT[0] - output[6][0]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[1] - output[6][1]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[2] - output[6][2]) < 1E-5, msg=errMsg)
self.assertTrue( abs(xisT[3] - output[6][3]) < 1E-5, msg=errMsg)
def test_stack_xi_direction(self):
# Just run the function, no checking output
evecs = self.metricParams.evecs[self.f_upper]
evals = self.metricParams.evals[self.f_upper]
masses1 = [2.2,1.8,0.4,0.3]
masses2 = [2.21,1.79,0.41,0.29]
xis1 = pycbc.tmpltbank.get_cov_params(masses1[0], masses1[1], \
masses1[2], masses1[3], self.metricParams, self.f_upper)
xis2 = pycbc.tmpltbank.get_cov_params(masses2[0], masses2[1], \
masses2[2], masses2[3], self.metricParams, self.f_upper)
testXis = [xis1[0],xis1[1]]
b_mtot, b_eta = pnutils.mass1_mass2_to_mtotal_eta(masses2[0],
masses2[1])
bestMasses = [b_mtot, b_eta, masses2[2], masses2[3]]
bestXis = xis2
depths = pycbc.tmpltbank.stack_xi_direction_brute(testXis, \
bestMasses, bestXis, 3, 0.03, self.massRangeParams, \
self.metricParams, self.f_upper, numIterations=50)
def test_point_distance(self):
masses1 = [2,2,0.4,0.6]
masses2 = [2.02,1.97,0.41,0.59]
dist, xis1, xis2 = pycbc.tmpltbank.get_point_distance(masses1, \
masses2, self.metricParams, self.f_upper)
diff = abs((dist - 23.3560790221) / dist)
errMsg = "Obtained distance does not agree with expected value."
self.assertTrue( diff < 1E-5, msg=errMsg)
def test_conv_to_sngl(self):
# Just run the function, no checking output
masses1 = [(2,2,0.4,0.3),(4.01,0.249,0.41,0.29)]
pycbc.tmpltbank.convert_to_sngl_inspiral_table(masses1, "a")
def test_ethinca_calc(self):
# Just run the function, no checking output
m1 = 2.
m2 = 2.
s1z = 0.
s2z = 0.
# ethinca calc breaks unless f0 = fLow
self.metricParams.f0 = self.metricParams.fLow
output = pycbc.tmpltbank.calculate_ethinca_metric_comps(
self.metricParams, self.ethincaParams, m1, m2, s1z, s2z)
# restore initial f0 value
self.metricParams.f0 = self.f0
def tearDown(self):
pass
suite = unittest.TestSuite()
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TmpltbankTestClass))
if __name__ == '__main__':
results = unittest.TextTestRunner(verbosity=2).run(suite)
simple_exit(results)
![swh spinner](/static/img/swh-spinner.gif)
Computing file changes ...