https://github.com/GeoscienceAustralia/PyRate
Tip revision: 3fd3e706522d0f4675eb006a9e9a141931668675 authored by Sudipta Basak on 17 June 2021, 05:56:54 UTC
WIP orbital correction tests
WIP orbital correction tests
Tip revision: 3fd3e70
test_timeseries.py
# This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# coding: utf-8
"""
This Python module contains tests for the timeseries.py PyRate module.
"""
import os
import shutil
from copy import deepcopy
import pytest
from datetime import date, timedelta
from numpy import nan, asarray, where, array
import numpy as np
from numpy.testing import assert_array_almost_equal
import pyrate.constants as C
import pyrate.core.orbital
import pyrate.core.prepifg_helper
import pyrate.core.ref_phs_est
import pyrate.core.refpixel
import tests.common as common
from pyrate.core import mst, covariance
from pyrate import correct, prepifg, conv2tif
from pyrate.configuration import Configuration
from pyrate.core.timeseries import time_series, linear_rate_pixel, linear_rate_array, TimeSeriesError
def default_params():
return {C.TIME_SERIES_METHOD: 1,
C.TIME_SERIES_PTHRESH: 0,
C.TIME_SERIES_SM_ORDER: 2,
C.TIME_SERIES_SM_FACTOR: -0.25,
C.PARALLEL: 0,
C.PROCESSES: 1,
C.NAN_CONVERSION: 1,
C.NO_DATA_VALUE: 0}
class SinglePixelIfg(object):
"""
A single pixel ifg (interferogram) solely for unit testing
"""
def __init__(self, first, second, phase, nan_fraction):
self.phase_data = asarray([[phase]])
self.first = first
self.second = second
self.nrows = 1
self.ncols = 1
self.nan_fraction = asarray([nan_fraction])
def convert_to_nans(self, val=0):
"""
Converts given values in phase data to NaNs
val - value to convert, default is 0
"""
self.phase_data = where(self.phase_data == val, nan, self.phase_data)
self.nan_converted = True
class TestTimeSeries:
"""Verifies error checking capabilities of the time_series function"""
@classmethod
def setup_class(cls):
cls.ifgs = common.small_data_setup()
cls.params = default_params()
cls.mstmat = mst.mst_boolean_array(cls.ifgs)
r_dist = covariance.RDist(cls.ifgs[0])()
cls.maxvar = [covariance.cvd(i, cls.params, r_dist)[0]
for i in cls.ifgs]
cls.vcmt = covariance.get_vcmt(cls.ifgs, cls.maxvar)
def test_time_series_unit(self):
"""
Checks that the code works the same as the calculated example
"""
ifirst = asarray([1, 1, 2, 2, 3, 3, 4, 5])
isecond = asarray([2, 4, 3, 4, 5, 6, 6, 6])
timeseries = asarray([0.0, 0.1, 0.6, 0.8, 1.1, 1.3])
phase = asarray([0.5, 4, 2.5, 3.5, 2.5, 3.5, 2.5, 1])
nan_fraction = asarray([0.5, 0.4, 0.2, 0.3, 0.1, 0.3, 0.2, 0.1])
now = date.today()
dates = [now + timedelta(days=(t*365.25)) for t in timeseries]
dates.sort()
first = [dates[m_num - 1] for m_num in ifirst]
second = [dates[s_num - 1] for s_num in isecond]
self.ifgs = [SinglePixelIfg(m, s, p, n) for m, s, p, n in
zip(first, second, phase, nan_fraction)]
tsincr, tscum, tsvel = time_series(
self.ifgs, params=self.params, vcmt=self.vcmt, mst=None)
expected = asarray([[[0.50, 3.0, 4.0, 5.5, 6.5]]])
assert_array_almost_equal(tscum, expected, decimal=2)
class TestLegacyTimeSeriesEquality:
@classmethod
def setup_class(cls):
params = Configuration(common.TEST_CONF_ROIPAC).__dict__
params[C.TEMP_MLOOKED_DIR] = os.path.join(params[C.OUT_DIR],
C.TEMP_MLOOKED_DIR)
conv2tif.main(params)
prepifg.main(params)
params[C.REF_EST_METHOD] = 2
xlks, _, crop = pyrate.core.prepifg_helper.transform_params(params)
dest_paths, headers = common.repair_params_for_correct_tests(params[C.INTERFEROGRAM_DIR], params)
correct._copy_mlooked(params)
copied_dest_paths = [os.path.join(params[C.TEMP_MLOOKED_DIR], os.path.basename(d)) for d in dest_paths]
del dest_paths
# start run_pyrate copy
ifgs = common.pre_prepare_ifgs(copied_dest_paths, params)
mst_grid = common.mst_calculation(copied_dest_paths, params)
refx, refy = pyrate.core.refpixel.ref_pixel_calc_wrapper(params)
params[C.REFX] = refx
params[C.REFY] = refy
params[C.ORBFIT_OFFSET] = True
# Estimate and remove orbit errors
pyrate.core.orbital.remove_orbital_error(ifgs, params)
ifgs = common.prepare_ifgs_without_phase(copied_dest_paths, params)
for ifg in ifgs:
ifg.close()
correct._update_params_with_tiles(params)
_, ifgs = pyrate.core.ref_phs_est.ref_phase_est_wrapper(params)
ifgs[0].open()
r_dist = covariance.RDist(ifgs[0])()
ifgs[0].close()
maxvar = [covariance.cvd(i, params, r_dist)[0] for i in copied_dest_paths]
for ifg in ifgs:
ifg.open()
vcmt = covariance.get_vcmt(ifgs, maxvar)
for ifg in ifgs:
ifg.close()
ifg.open()
ifg.nodata_value = 0.0
params[C.TIME_SERIES_METHOD] = 1
params[C.PARALLEL] = 0
# Calculate time series
cls.tsincr_0, cls.tscum_0, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)
params[C.PARALLEL] = 1
cls.tsincr_1, cls.tscum_1, cls.tsvel_1 = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)
# load the legacy data
ts_dir = os.path.join(common.SML_TEST_DIR, 'time_series')
tsincr_path = os.path.join(ts_dir, 'ts_incr_interp0_method1.csv')
ts_incr = np.genfromtxt(tsincr_path)
tscum_path = os.path.join(ts_dir, 'ts_cum_interp0_method1.csv')
ts_cum = np.genfromtxt(tscum_path)
cls.ts_incr = np.reshape(ts_incr, newshape=cls.tsincr_0.shape, order='F')
cls.ts_cum = np.reshape(ts_cum, newshape=cls.tscum_0.shape, order='F')
cls.params = params
@classmethod
def teardown_class(cls):
shutil.rmtree(cls.params[C.OUT_DIR])
def test_time_series_equality_parallel_by_rows(self):
"""
check time series parallel by rows jobs
"""
self.assertEqual(self.tsincr_1.shape, self.tscum_1.shape)
self.assertEqual(self.tsvel_1.shape, self.tsincr_1.shape)
np.testing.assert_array_almost_equal(
self.ts_incr, self.tsincr_1, decimal=3)
np.testing.assert_array_almost_equal(
self.ts_cum, self.tscum_1, decimal=3)
def test_time_series_equality_serial_by_the_pixel(self):
"""
check time series
"""
self.assertEqual(self.tsincr_0.shape, self.tscum_0.shape)
np.testing.assert_array_almost_equal(
self.ts_incr, self.tsincr_0, decimal=3)
np.testing.assert_array_almost_equal(
self.ts_cum, self.tscum_0, decimal=3)
@staticmethod
def assertEqual(val1, val2):
assert val1 == val2
class TestLegacyTimeSeriesEqualityMethod2Interp0:
@classmethod
def setup_class(cls):
params = Configuration(common.TEST_CONF_ROIPAC).__dict__
params[C.TEMP_MLOOKED_DIR] = os.path.join(params[C.OUT_DIR],
C.TEMP_MLOOKED_DIR)
conv2tif.main(params)
prepifg.main(params)
params[C.REF_EST_METHOD] = 2
xlks, _, crop = pyrate.core.prepifg_helper.transform_params(params)
dest_paths, headers = common.repair_params_for_correct_tests(params[C.INTERFEROGRAM_DIR], params)
correct._copy_mlooked(params)
copied_dest_paths = [os.path.join(params[C.TEMP_MLOOKED_DIR], os.path.basename(d)) for d in dest_paths]
del dest_paths
# start run_pyrate copy
ifgs = common.pre_prepare_ifgs(copied_dest_paths, params)
mst_grid = common.mst_calculation(copied_dest_paths, params)
refx, refy = pyrate.core.refpixel.ref_pixel_calc_wrapper(params)
params[C.REFX] = refx
params[C.REFY] = refy
params[C.ORBFIT_OFFSET] = True
# Estimate and remove orbit errors
pyrate.core.orbital.remove_orbital_error(ifgs, params)
ifgs = common.prepare_ifgs_without_phase(copied_dest_paths, params)
for ifg in ifgs:
ifg.close()
correct._update_params_with_tiles(params)
_, ifgs = pyrate.core.ref_phs_est.ref_phase_est_wrapper(params)
ifgs[0].open()
r_dist = covariance.RDist(ifgs[0])()
ifgs[0].close()
# Calculate interferogram noise
maxvar = [covariance.cvd(i, params, r_dist)[0] for i in copied_dest_paths]
for ifg in ifgs:
ifg.open()
vcmt = covariance.get_vcmt(ifgs, maxvar)
for ifg in ifgs:
ifg.close()
ifg.open()
ifg.nodata_value = 0.0
params[C.TIME_SERIES_METHOD] = 2
params[C.PARALLEL] = 1
# Calculate time series
cls.tsincr, cls.tscum, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)
params[C.PARALLEL] = 0
# Calculate time series serailly by the pixel
cls.tsincr_0, cls.tscum_0, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)
# copy legacy data
SML_TIME_SERIES_DIR = os.path.join(common.SML_TEST_DIR, 'time_series')
tsincr_path = os.path.join(SML_TIME_SERIES_DIR, 'ts_incr_interp0_method2.csv')
ts_incr = np.genfromtxt(tsincr_path)
tscum_path = os.path.join(SML_TIME_SERIES_DIR, 'ts_cum_interp0_method2.csv')
ts_cum = np.genfromtxt(tscum_path)
cls.ts_incr = np.reshape(ts_incr, newshape=cls.tsincr_0.shape, order='F')
cls.ts_cum = np.reshape(ts_cum, newshape=cls.tscum_0.shape, order='F')
cls.params = params
@classmethod
def teardown_class(cls):
shutil.rmtree(cls.params[C.OUT_DIR])
def test_time_series_equality_parallel_by_rows(self):
assert self.tsincr.shape == self.tscum.shape
np.testing.assert_array_almost_equal(self.ts_incr, self.tsincr, decimal=1)
np.testing.assert_array_almost_equal(self.ts_cum, self.tscum, decimal=1)
def test_time_series_equality_serial_by_the_pixel(self):
assert self.tsincr_0.shape == self.tscum_0.shape
np.testing.assert_array_almost_equal(self.ts_incr, self.tsincr_0, decimal=3)
np.testing.assert_array_almost_equal(self.ts_cum, self.tscum_0, decimal=3)
class TestLinearRatePixel:
"""
Tests the linear regression algorithm for determining the best
fitting velocity from a cumulative time series
"""
def test_linear_rate_pixel_clean(self):
y = array([0, 2, 4, 6, 8, 10])
t = array([0, 1, 2, 3, 4, 5])
exp = (2.0, 0.0, 1.0, 0.0, 6)
res = linear_rate_pixel(y, t)
assert res == exp
def test_linear_rate_pixel_neg_rate(self):
y = array([0, -2, -4, -6, -8, -10])
t = array([0, 1, 2, 3, 4, 5])
exp = (-2.0, 0.0, 1.0, 0.0, 6)
res = linear_rate_pixel(y, t)
assert res == exp
def test_linear_rate_pixel_outlier(self):
y = array([0, 2, 4, 6, 8, 20])
t = array([0, 1, 2, 3, 4, 5])
exp = (3.428571, -1.904761, 0.812030, 0.824786, 6)
res = linear_rate_pixel(y, t)
assert res == pytest.approx(exp, rel=1e-6)
def test_linear_rate_pixel_noise(self):
y = array([0, 2, 4, 6, 8, 10])
r = y + np.random.rand(6) # add different uniform noise each time
t = array([0, 1, 2, 3, 4, 5])
exprate = 2.0
explsqd = 1.0
experr = 0.0
rate, _, lsqd, err, _ = linear_rate_pixel(y, t)
assert exprate == pytest.approx(rate, rel=1e-1)
assert explsqd == pytest.approx(lsqd, rel=1e-1)
assert experr == pytest.approx(err, rel=1e-1)
def test_linear_rate_pixel_exception(self):
# input vectors should be equal length
y = array([2, 4, 6, 8, 10])
t = array([0, 1, 2, 3, 4, 5])
with pytest.raises(TimeSeriesError):
res = linear_rate_pixel(y, t)
def test_linear_rate_pixel_nans(self):
# at least two obs are required for line fitting
y = array([0, nan, nan, nan, nan, nan])
t = array([0, 1, 2, 3, 4, 5])
exp = (nan, nan, nan, nan, nan)
res = linear_rate_pixel(y, t)
assert res == exp
class TestLinearRateArray:
"""
Tests the array loop wrapper for the linear regression algorithm using real data
"""
@classmethod
@pytest.fixture(autouse=True)
def setup_class(cls, roipac_params):
cls.params = roipac_params
cls.ifgs = common.small_data_setup()
# read in input (tscuml) and expected output arrays
tscuml_path = os.path.join(common.SML_TEST_LINRATE, "tscuml_0.npy")
cls.tscuml0 = np.load(tscuml_path)
# add zero epoch to tscuml 3D array
cls.tscuml = np.insert(cls.tscuml0, 0, 0, axis=2)
linrate_path = os.path.join(common.SML_TEST_LINRATE, "linear_rate.npy")
cls.linrate = np.load(linrate_path)
error_path = os.path.join(common.SML_TEST_LINRATE, "linear_error.npy")
cls.error = np.load(error_path)
icpt_path = os.path.join(common.SML_TEST_LINRATE, "linear_intercept.npy")
cls.icpt = np.load(icpt_path)
samp_path = os.path.join(common.SML_TEST_LINRATE, "linear_samples.npy")
cls.samp = np.load(samp_path)
rsq_path = os.path.join(common.SML_TEST_LINRATE, "linear_rsquared.npy")
cls.rsq = np.load(rsq_path)
def test_linear_rate_array(self):
"""
Input and expected output are on disk. This test only tests the linear_rate_array
and linear_rate_pixel functions using real data.
"""
l, i, r, e, s = linear_rate_array(self.tscuml, self.ifgs, self.params)
# test to 20 decimal places
assert_array_almost_equal(self.linrate, l, 1e-20)
assert_array_almost_equal(self.icpt, i, 1e-20)
assert_array_almost_equal(self.rsq, r, 1e-20)
assert_array_almost_equal(self.error, e, 1e-20)
assert_array_almost_equal(self.samp, s, 1e-20)
def test_linear_rate_array_two_sigma(self):
"""
Check that the "nsigma" switch in the config dictionary
actually results in a change in the error map.
"""
# make a deep copy of the params dict to avoid changing
# state for other tests if this one fails
params = deepcopy(self.params)
params[C.VELERROR_NSIG] = 2
_, _, _, e, _ = linear_rate_array(self.tscuml, self.ifgs, params)
assert_array_almost_equal(self.error*2, e, 1e-20)
def test_linear_rate_array_exception(self):
# depth of tscuml should equal nepochs
with pytest.raises(TimeSeriesError):
res = linear_rate_array(self.tscuml0, self.ifgs, self.params)