Raw File
import copy
import unittest
import random
import os
from numpy import complex128, real, sqrt, sin, cos, angle, ceil, log
from numpy import zeros, argmax, array
from pycbc import DYN_RANGE_FAC
from pycbc.waveform import get_td_waveform, get_fd_waveform, td_approximants, fd_approximants
from pycbc.pnutils import nearest_larger_binary_number
from pycbc.types import FrequencySeries, TimeSeries, complex_same_precision_as
from pycbc.types import load_frequencyseries
from pycbc.filter import sigmasq, overlap_cplx, matched_filter_core
from pycbc.filter import compute_max_snr_over_sky_loc_stat
from pycbc.filter import compute_max_snr_over_sky_loc_stat_no_phase
from pycbc.filter import compute_u_val_for_sky_loc_stat_no_phase
from pycbc.filter import compute_u_val_for_sky_loc_stat
from pycbc import psd
from pycbc import vetoes
from utils import parse_args_all_schemes, simple_exit

_scheme, _context = parse_args_all_schemes("correlate")

expected_results = {}
for idx in range(4):
    expected_results[idx] = {}
    for jdx in range(4):
        expected_results[idx][jdx] = {}

expected_results[0][0]['Ip_snr'] = 100.0
expected_results[0][0]['Ip_angle'] = -1.88619488652e-18
expected_results[0][0]['Ip_argmax'] = 0
expected_results[0][0]['Ic_snr'] = 98.7349100759
expected_results[0][0]['Ic_angle'] = 1.60960753393
expected_results[0][0]['Ic_argmax'] = 3

expected_results[0][1]['Ip_snr'] = 96.3390579783
expected_results[0][1]['Ip_angle'] = 0.511744420131
expected_results[0][1]['Ip_argmax'] = 1387
expected_results[0][1]['Ic_snr'] = 96.3390579783
expected_results[0][1]['Ic_angle'] = 2.08254074693
expected_results[0][1]['Ic_argmax'] = 1387

expected_results[0][2]['Ip_snr'] = 98.8434423546
expected_results[0][2]['Ip_angle'] = 0.566451407787
expected_results[0][2]['Ip_argmax'] = 1100
expected_results[0][2]['Ic_snr'] = 98.8485523538
expected_results[0][2]['Ic_angle'] = 2.10418718318
expected_results[0][2]['Ic_argmax'] = 1099

expected_results[0][3]['Ip_snr'] = 96.4239530554
expected_results[0][3]['Ip_angle'] = -0.946889162447
expected_results[0][3]['Ip_argmax'] = 1447
expected_results[0][3]['Ic_snr'] = 95.6528566731
expected_results[0][3]['Ic_angle'] = 1.10466400896
expected_results[0][3]['Ic_argmax'] = 1484

expected_results[1][0]['Ip_snr'] = 96.3390579783
expected_results[1][0]['Ip_angle'] = -0.511744420131
expected_results[1][0]['Ip_argmax'] = 260757
expected_results[1][0]['Ic_snr'] = 94.4282712604
expected_results[1][0]['Ic_angle'] = 0.957548192904
expected_results[1][0]['Ic_argmax'] = 260753

expected_results[1][1]['Ip_snr'] = 100.0
expected_results[1][1]['Ip_angle'] = -1.73241699772e-18
expected_results[1][1]['Ip_argmax'] = 0
expected_results[1][1]['Ic_snr'] = 100.0
expected_results[1][1]['Ic_angle'] = 1.57079632679
expected_results[1][1]['Ic_argmax'] = 0

expected_results[1][2]['Ip_snr'] = 97.6397283701
expected_results[1][2]['Ip_angle'] = 0.156578884423
expected_results[1][2]['Ip_argmax'] = 261862
expected_results[1][2]['Ic_snr'] = 97.7584101045
expected_results[1][2]['Ic_angle'] = 1.69481552225
expected_results[1][2]['Ic_argmax'] = 261861

expected_results[1][3]['Ip_snr'] = 99.4434573331
expected_results[1][3]['Ip_angle'] = -1.50330148916
expected_results[1][3]['Ip_argmax'] = 58
expected_results[1][3]['Ic_snr'] = 98.0771994342
expected_results[1][3]['Ic_angle'] = 0.521399663782
expected_results[1][3]['Ic_argmax'] = 94

expected_results[2][0]['Ip_snr'] = 98.8434423546
expected_results[2][0]['Ip_angle'] = -0.566451407787
expected_results[2][0]['Ip_argmax'] = 261044
expected_results[2][0]['Ic_snr'] = 96.8727372348
expected_results[2][0]['Ic_angle'] = 0.921151545297
expected_results[2][0]['Ic_argmax'] = 261041

expected_results[2][1]['Ip_snr'] = 97.6397283701
expected_results[2][1]['Ip_angle'] = -0.156578884423
expected_results[2][1]['Ip_argmax'] = 282
expected_results[2][1]['Ic_snr'] = 97.6397283701
expected_results[2][1]['Ic_angle'] = 1.41421744237
expected_results[2][1]['Ic_argmax'] = 282

expected_results[2][2]['Ip_snr'] = 100.0
expected_results[2][2]['Ip_angle'] = 2.41820532326e-18
expected_results[2][2]['Ip_argmax'] = 0
expected_results[2][2]['Ic_snr'] = 99.988543227
expected_results[2][2]['Ic_angle'] = 1.5377922043
expected_results[2][2]['Ic_argmax'] = 262143

expected_results[2][3]['Ip_snr'] = 97.3725007917
expected_results[2][3]['Ip_angle'] = -1.61086911817
expected_results[2][3]['Ip_argmax'] = 342
expected_results[2][3]['Ic_snr'] = 96.2035744912
expected_results[2][3]['Ic_angle'] = 0.442670138337
expected_results[2][3]['Ic_argmax'] = 379

expected_results[3][0]['Ip_snr'] = 96.4239530554
expected_results[3][0]['Ip_angle'] = 0.946889162447
expected_results[3][0]['Ip_argmax'] = 260697
expected_results[3][0]['Ic_snr'] = 94.4958639934
expected_results[3][0]['Ic_angle'] = 2.41579357775
expected_results[3][0]['Ic_argmax'] = 260693

expected_results[3][1]['Ip_snr'] = 99.4434573331
expected_results[3][1]['Ip_angle'] = 1.50330148916
expected_results[3][1]['Ip_argmax'] = 262086
expected_results[3][1]['Ic_snr'] = 99.4434573331
expected_results[3][1]['Ic_angle'] = 3.07409781595
expected_results[3][1]['Ic_argmax'] = 262086

expected_results[3][2]['Ip_snr'] = 97.3725007917
expected_results[3][2]['Ip_angle'] = 1.61086911817
expected_results[3][2]['Ip_argmax'] = 261802
expected_results[3][2]['Ic_snr'] = 97.3906866656
expected_results[3][2]['Ic_angle'] = -3.11216854627
expected_results[3][2]['Ic_argmax'] = 261802

expected_results[3][3]['Ip_snr'] = 100.0
expected_results[3][3]['Ip_angle'] = 9.03608368726e-19
expected_results[3][3]['Ip_argmax'] = 0
expected_results[3][3]['Ic_snr'] = 99.4335056063
expected_results[3][3]['Ic_angle'] = 2.02876392072
expected_results[3][3]['Ic_argmax'] = 36

def generate_detector_strain(template_params, h_plus, h_cross):
    polarization = 0

    if hasattr(template_params, 'polarization'):
        polarization = template_params.polarization

    f_plus = cos(polarization)
    f_cross = sin(polarization)

    return h_plus * f_plus + h_cross * f_cross

def make_padded_frequency_series(vec, filter_N=None, delta_f=None):
    """Convert vec (TimeSeries or FrequencySeries) to a FrequencySeries. If
    filter_N and/or delta_f are given, the output will take those values. If
    not told otherwise the code will attempt to pad a timeseries first such that
    the waveform will not wraparound. However, if delta_f is specified to be
    shorter than the waveform length then wraparound *will* be allowed.
    """
    if filter_N is None:
        power = ceil(log(len(vec), 2)) + 1
        N = 2 ** power
    else:
        N = filter_N
    n = N / 2 + 1

    if isinstance(vec, FrequencySeries):
        vectilde = FrequencySeries(zeros(n, dtype=complex_same_precision_as(vec)),
                                   delta_f=1.0, copy=False)
        if len(vectilde) < len(vec):
            cplen = len(vectilde)
        else:
            cplen = len(vec)
        vectilde[0:cplen] = vec[0:cplen]
        delta_f = vec.delta_f

    elif isinstance(vec, TimeSeries):
        # First determine if the timeseries is too short for the specified df
        # and increase if necessary
        curr_length = len(vec)
        new_length = int(nearest_larger_binary_number(curr_length))
        while new_length * vec.delta_t < 1./delta_f:
            new_length = new_length * 2
        vec.resize(new_length)
        # Then convert to frequencyseries
        v_tilde = vec.to_frequencyseries()
        # Then convert frequencyseries to required length and spacing by keeping
        # only every nth sample if delta_f needs increasing, and cutting at
        # Nyquist if the max frequency is too high.
        # NOTE: This assumes that the input and output data is using binary
        #       lengths.
        i_delta_f = v_tilde.get_delta_f()
        v_tilde = v_tilde.numpy()
        df_ratio = int(delta_f / i_delta_f)
        n_freq_len = int((n-1) * df_ratio +1)
        assert(n <= len(v_tilde))
        df_ratio = int(delta_f / i_delta_f)
        v_tilde = v_tilde[:n_freq_len:df_ratio]
        vectilde = FrequencySeries(v_tilde, delta_f=delta_f, dtype=complex128)

    return FrequencySeries(vectilde * DYN_RANGE_FAC, delta_f=delta_f,
                           dtype=complex128)

def get_waveform(wf_params, start_frequency, sample_rate, length,
                 filter_rate, sky_max_template=False):
    delta_f = filter_rate / float(length)
    if wf_params.approximant in fd_approximants():
        hp, hc = get_fd_waveform(wf_params, delta_f=delta_f,
                                 f_lower=start_frequency)

    elif wf_params.approximant in td_approximants():
        hp, hc = get_td_waveform(wf_params, delta_t=1./sample_rate,
                                 f_lower=start_frequency)

    if not sky_max_template:
        hvec = generate_detector_strain(wf_params, hp, hc)
        return make_padded_frequency_series(hvec, length, delta_f=delta_f)
    else:
        return make_padded_frequency_series(hp, length, delta_f=delta_f), \
            make_padded_frequency_series(hc, length, delta_f=delta_f)

class DummyClass(object):
    pass

class TestChisq(unittest.TestCase):
    def setUp(self, *args):
        # Where are my data files?
        if os.path.isfile('test/data/ZERO_DET_high_P.txt'):
            self.dataDir = 'test/data/'
        elif os.path.isfile('data/ZERO_DET_high_P.txt'):
            self.dataDir = 'data/'
        else:
            self.assertTrue(False, msg="Cannot find data files!")
        self.context = _context
        self.scheme = _scheme
        self.tolerance = 1e-6
        self.filter_t_length = 16
        self.low_freq_filter = 30.
        self.sample_rate = 16384
        self.filter_N = int(self.filter_t_length * self.sample_rate)
        self.filter_n = int(self.filter_N / 2 + 1)
        self.filter_delta_f = 1.0 / self.filter_t_length
        self.psd = psd.from_string('aLIGOZeroDetHighPowerGWINC',
                                   self.filter_n, self.filter_delta_f,
                                   self.low_freq_filter)
        self.psd *= DYN_RANGE_FAC*DYN_RANGE_FAC
        wps1 = DummyClass()
        wps1.mass1 = 123.7627
        wps1.mass2 = 72.55471
        wps1.inclination = 1.125029
        wps1.coa_phase = 2.906049
        wps1.approximant='EOBNRv2HM'
        self.wps1 = wps1

        wps2 = DummyClass()
        wps2.mass1 = 131.460647583
        wps2.mass2 = 69.0030059814
        wps2.inclination = 0.8432287
        wps2.coa_phase = 0.2
        wps2.approximant = 'SEOBNRv4_ROM'
        self.wps2 = wps2

        wps3 = copy.deepcopy(wps2)
        wps3.approximant = 'EOBNRv2HM_ROM'
        self.wps3 = wps3

        wps4 = copy.deepcopy(wps2)
        wps4.spin1x = 0.8
        wps4.spin2y = -0.9
        wps4.approximant = 'IMRPhenomPv2'
        self.wps4 = wps4

        self.wps_list = [wps1, wps2, wps3, wps4]

        self.sm_power_chisq = vetoes.SingleDetSkyMaxPowerChisq(num_bins='1')
        self.sm_power_chisq2 = vetoes.SingleDetSkyMaxPowerChisq(num_bins='10')
        self.power_chisq = vetoes.SingleDetPowerChisq(num_bins='1')
        self.power_chisq2 = vetoes.SingleDetPowerChisq(num_bins='10')


    def test_filtering(self):
        idx = self.idx
        jdx = self.jdx
        #w1 = self.wps_list[idx]
        #w2 = self.wps_list[jdx]
        #stilde = get_waveform(w1, self.low_freq_filter-1,
        #                      self.sample_rate, self.filter_N,
        #                      self.sample_rate)
        #try:
        #    stilde.save('data/skymaxtest_stilde_%d.hdf' % idx)
        #except:
        #    pass
        stilde = load_frequencyseries(self.dataDir + \
                                      'skymaxtest_stilde_%d.hdf' % idx)
        s_norm = sigmasq(stilde, psd=self.psd,
                         low_frequency_cutoff=self.low_freq_filter)
        stilde /= sqrt(float(s_norm))
        stilde *= 100
        #hplus, hcross = get_waveform(w2, self.low_freq_filter-1,
        #                             self.sample_rate, self.filter_N,
        #                             self.sample_rate, sky_max_template=True)
        #try:
        #    hplus.save('data/skymaxtest_hplus_%d.hdf' % jdx)
        #    hcross.save('data/skymaxtest_hcross_%d.hdf' % jdx)
        #except:
        #    pass
        hplus = load_frequencyseries(self.dataDir + \
                                     'skymaxtest_hplus_%d.hdf' % jdx)
        hcross = load_frequencyseries(self.dataDir + \
                                      'skymaxtest_hcross_%d.hdf' % jdx)
        hplus.f_lower = self.low_freq_filter
        hplus.params = random.randint(0,100000000000)
        hcross.f_lower = self.low_freq_filter
        hcross.params = random.randint(0,100000000000)
        hp_norm = sigmasq(hplus, psd=self.psd,
                          low_frequency_cutoff=self.low_freq_filter)
        hc_norm = sigmasq(hcross, psd=self.psd,
                          low_frequency_cutoff=self.low_freq_filter)
        hplus /= sqrt(float(hp_norm))
        hcross /= sqrt(float(hc_norm))
        hpc_corr = overlap_cplx(hplus, hcross, psd=self.psd,
                                low_frequency_cutoff=self.low_freq_filter,
                                normalized=False)
        hpc_corr_R = real(hpc_corr)
        I_plus, corr_plus, n_plus = matched_filter_core\
            (hplus, stilde, psd=self.psd,
             low_frequency_cutoff=self.low_freq_filter, h_norm=1.)
        # FIXME: Remove the deepcopies before merging with master
        I_plus = copy.deepcopy(I_plus)
        corr_plus = copy.deepcopy(corr_plus)
        I_cross, corr_cross, n_cross = matched_filter_core\
            (hcross, stilde, psd=self.psd,
             low_frequency_cutoff=self.low_freq_filter, h_norm=1.)
        I_cross = copy.deepcopy(I_cross)
        corr_cross = copy.deepcopy(corr_cross)
        I_plus = I_plus * n_plus
        I_cross = I_cross * n_cross
        IPM = abs(I_plus.data).argmax()
        ICM = abs(I_cross.data).argmax()
        self.assertAlmostEqual(abs(I_plus[IPM]),
                               expected_results[idx][jdx]['Ip_snr'])
        self.assertAlmostEqual(angle(I_plus[IPM]),
                               expected_results[idx][jdx]['Ip_angle'])
        self.assertEqual(IPM, expected_results[idx][jdx]['Ip_argmax'])
        self.assertAlmostEqual(abs(I_cross[ICM]),
                               expected_results[idx][jdx]['Ic_snr'])
        self.assertAlmostEqual(angle(I_cross[ICM]),
                               expected_results[idx][jdx]['Ic_angle'])
        self.assertEqual(ICM, expected_results[idx][jdx]['Ic_argmax'])

        #print "expected_results[{}][{}]['Ip_snr'] = {}" .format(idx,jdx,abs(I_plus[IPM]))
        #print "expected_results[{}][{}]['Ip_angle'] = {}".format(idx,jdx,angle(I_plus[IPM]))
        #print "expected_results[{}][{}]['Ip_argmax'] = {}".format(idx,jdx, IPM)
        #print "expected_results[{}][{}]['Ic_snr'] = {}" .format(idx,jdx,abs(I_cross[ICM]))
        #print "expected_results[{}][{}]['Ic_angle'] = {}".format(idx,jdx,angle(I_cross[ICM]))
        #print "expected_results[{}][{}]['Ic_argmax'] = {}".format(idx,jdx, ICM)

        det_stat_prec = compute_max_snr_over_sky_loc_stat\
            (I_plus, I_cross, hpc_corr_R, hpnorm=1., hcnorm=1.,
             thresh=0.1, analyse_slice=slice(0,len(I_plus.data)))
        det_stat_hom = compute_max_snr_over_sky_loc_stat_no_phase\
            (I_plus, I_cross, hpc_corr_R, hpnorm=1., hcnorm=1.,
             thresh=0.1, analyse_slice=slice(0,len(I_plus.data)))
        idx_max_prec = argmax(det_stat_prec.data)
        idx_max_hom = argmax(det_stat_hom.data)
        max_ds_prec = det_stat_prec[idx_max_prec]
        max_ds_hom = det_stat_hom[idx_max_hom]

        uvals_prec, _ = compute_u_val_for_sky_loc_stat\
            (I_plus.data, I_cross.data, hpc_corr_R, indices=[idx_max_prec],
             hpnorm=1., hcnorm=1.)
        uvals_hom, _ = compute_u_val_for_sky_loc_stat_no_phase\
            (I_plus.data, I_cross.data, hpc_corr_R, indices=[idx_max_hom],
             hpnorm=1., hcnorm=1.)

        ht = hplus * uvals_hom[0] + hcross
        ht_norm = sigmasq(ht, psd=self.psd,
                          low_frequency_cutoff=self.low_freq_filter)
        ht /= sqrt(float(ht_norm))
        ht.f_lower = self.low_freq_filter
        ht.params = random.randint(0,100000000000)
        I_t, corr_t, n_t = matched_filter_core\
            (ht, stilde, psd=self.psd,
             low_frequency_cutoff=self.low_freq_filter, h_norm=1.)
        I_t = I_t * n_t
        self.assertAlmostEqual(abs(real(I_t.data[idx_max_hom])), max_ds_hom)
        self.assertEqual(abs(real(I_t.data[idx_max_hom])),
                         max(abs(real(I_t.data))))
        chisq, _ = self.power_chisq.values\
            (corr_t, array([max_ds_hom]) / n_plus, n_t, self.psd,
             array([idx_max_hom]), ht)

        ht = hplus * uvals_prec[0] + hcross
        ht_norm = sigmasq(ht, psd=self.psd,
                          low_frequency_cutoff=self.low_freq_filter)
        ht /= sqrt(float(ht_norm))
        ht.f_lower = self.low_freq_filter
        ht.params = random.randint(0,100000000000)
        I_t, corr_t, n_t = matched_filter_core\
            (ht, stilde, psd=self.psd,
             low_frequency_cutoff=self.low_freq_filter, h_norm=1.)
        I_t = I_t * n_t

        chisq, _ = self.power_chisq.values\
            (corr_t, array([max_ds_prec]) / n_plus, n_t, self.psd,
             array([idx_max_prec]), ht)

        self.assertAlmostEqual(abs(I_t.data[idx_max_prec]), max_ds_prec)
        self.assertEqual(idx_max_prec, abs(I_t.data).argmax())
        self.assertTrue(chisq < 1E-4)



def test_maker(class_name, idx, jdx):
    class Test(class_name):
        idx = idx
        jdx = jdx

    Test.__name__ = "Test %s" % '_'.join([str(idx),str(jdx)])
    return Test


suite = unittest.TestSuite()
for idx in range(4):
    for jdx in range(4):
        curr_cls = test_maker(TestChisq, idx, jdx)
        suite.addTest(unittest.TestLoader().loadTestsFromTestCase(curr_cls))

if __name__ == '__main__':
    results = unittest.TextTestRunner(verbosity=2).run(suite)
    simple_exit(results)

back to top