https://github.com/GeoscienceAustralia/PyRate
Tip revision: 3fd3e706522d0f4675eb006a9e9a141931668675 authored by Sudipta Basak on 17 June 2021, 05:56:54 UTC
WIP orbital correction tests
WIP orbital correction tests
Tip revision: 3fd3e70
test_aps.py
# This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This Python module contains tests for the APS PyRate module.
"""
import os
import shutil
import pytest
import numpy as np
from os.path import join
from scipy.ndimage import gaussian_filter1d, gaussian_filter
from numpy.testing import assert_array_almost_equal
import pyrate.constants as C
from pyrate import conv2tif, prepifg, correct
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core import shared
from pyrate.core.aps import wrap_spatio_temporal_filter, _interpolate_nans_2d, _kernel
from pyrate.core.aps import gaussian_temporal_filter as tlpfilter, gaussian_spatial_filter as slpfilter
from pyrate.core.shared import Ifg
from pyrate.core.ifgconstants import DAYS_PER_YEAR
from tests import common
from tests.common import BASE_TEST, PY37GDAL304
@pytest.fixture(params=["linear", "nearest", "cubic"])
def slpnanfill_method(request):
return request.param
@pytest.fixture(params=[0.1, 0.5, 1, 5])
def slpfcutoff_method(request):
return request.param
@pytest.fixture(params=[0.001, 0.01])
def slpfcutoff(request):
return request.param
def test_interpolate_nans_2d(slpnanfill_method):
arr = np.random.rand(200, 100)
arr[arr < 0.1] = np.nan # insert some nans
assert np.sum(np.isnan(arr)) != 0 # some nans present
_interpolate_nans_2d(arr, method=slpnanfill_method)
assert np.sum(np.isnan(arr)) == 0 # should not be any nans
class TestSpatialFilter:
"""
Test the implementation of Gaussian spatial filter
"""
def setup_method(self):
ifg_path = join(str(BASE_TEST), 'cropB', '20180106-20180130_ifg.tif')
ifg = Ifg(ifg_path)
ifg.open()
p = ifg.phase_data
self.x_size = ifg.x_size
self.y_size = ifg.y_size
# convert zeros to NaNs
p[p == 0] = np.nan
self.phase = p
def test_gaussian_filter(self, slpfcutoff_method):
"""
Compare against scipy.ndimage.gaussian_filter,
that operates in the spatial domain on data with equal resolution in x and y
"""
e = gaussian_filter(self.phase, sigma=slpfcutoff_method)
r = slpfilter(self.phase, cutoff=slpfcutoff_method,
x_size=self.x_size, y_size=self.y_size)
# pull data from middle of images; away from potential edge effects
exp = e[50:150, 50:150]
res = r[50:150, 50:150]
assert_array_almost_equal(res, exp, 0)
def test_gaussian_kernel():
"""
Test the Gaussian smoothing kernel
"""
x = np.arange(1, 10, 2)
res = _kernel(x, 3)
exp = np.array([0.94595947, 0.60653066, 0.24935221, 0.06572853, 0.011109])
np.testing.assert_array_almost_equal(res, exp, decimal=6)
res = _kernel(x, 4)
exp = np.array([0.96923323, 0.7548396, 0.45783336, 0.21626517, 0.07955951])
np.testing.assert_array_almost_equal(res, exp, decimal=6)
res = _kernel(x, 0.001)
exp = np.array([0, 0, 0, 0, 0])
np.testing.assert_array_equal(res, exp)
class TestTemporalFilter:
"""
Tests for the temporal filter with synthetic data for a single pixel
"""
def setup_method(self):
self.thr = 1 # no nans in these test cases, threshold = 1
# instance of normally distributed noise
n = np.array([-0.36427456, 0.69539061, 0.42181139, -2.56306134,
0.55844095, -0.65562626, 0.65607911, 1.19431637,
-1.43837395, -0.91656358])
# synthetic incremental displacement
d = np.array([1., 1., 0.7, 0.3, 0., 0.1, 0.2, 0.6, 1., 1.])
# incremental displacement + noise
self.tsincr = d * 2 + n
# regular time series, every 12 days
self.interval = 12 / DAYS_PER_YEAR # 0.03285 years
intv = np.ones(d.shape, dtype=np.float32) * self.interval
self.span = np.cumsum(intv)
def test_tlpfilter_repeatability(self):
"""
TEST 1: check for repeatability against expected result from
tlpfilter. Cutoff equal to sampling interval (sigma=1)
"""
res = tlpfilter(self.tsincr, self.interval, self.span, self.thr)
exp = np.array([1.9936507, 1.9208364, 1.0252733, -0.07402889,
-0.1842336, 0.24325351, 0.94737214, 1.3890865,
1.1903466, 1.0036403])
np.testing.assert_array_almost_equal(res, exp, decimal=6)
def test_tlpfilter_scipy_sig1(self):
"""
TEST 2: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
regularly sampled data. Cutoff equal to sampling interval (sigma=1)
"""
res = tlpfilter(self.tsincr, self.interval, self.span, self.thr)
exp = gaussian_filter1d(self.tsincr, sigma=1)
np.testing.assert_array_almost_equal(res, exp, decimal=1)
def test_tlpfilter_scipy_sig2(self):
"""
TEST 3: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
regularly sampled data. Cutoff equal to twice the sampling interval (sigma=2)
"""
res = tlpfilter(self.tsincr, self.interval * 2, self.span, self.thr)
exp = gaussian_filter1d(self.tsincr, sigma=2)
np.testing.assert_array_almost_equal(res, exp, decimal=1)
def test_tlpfilter_scipy_sig05(self):
"""
TEST 4: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
regularly sampled data. Cutoff equal to half the sampling interval (sigma=0.5)
"""
res = tlpfilter(self.tsincr, self.interval * 0.5, self.span, self.thr)
exp = gaussian_filter1d(self.tsincr, sigma=0.5)
np.testing.assert_array_almost_equal(res, exp, decimal=2)
@pytest.mark.slow
@pytest.mark.skipif(not PY37GDAL304, reason="Only run in one CI env")
class TestAPSErrorCorrectionsOnDiscReused:
@classmethod
def setup_method(cls):
cls.conf = common.TEST_CONF_GAMMA
params = Configuration(cls.conf).__dict__
conv2tif.main(params)
params = Configuration(cls.conf).__dict__
prepifg.main(params)
cls.params = Configuration(cls.conf).__dict__
correct._copy_mlooked(cls.params)
correct._update_params_with_tiles(cls.params)
correct._create_ifg_dict(cls.params)
multi_paths = cls.params[C.INTERFEROGRAM_FILES]
cls.ifg_paths = [p.tmp_sampled_path for p in multi_paths]
cls.ifgs = [shared.Ifg(i) for i in cls.ifg_paths]
for i in cls.ifgs:
i.open()
shared.save_numpy_phase(cls.ifg_paths, cls.params)
correct.mst_calc_wrapper(cls.params)
@classmethod
def teardown_method(cls):
shutil.rmtree(cls.params[C.OUT_DIR])
def test_aps_error_files_on_disc(self, slpnanfill_method, slpfcutoff):
self.params[C.SLPF_NANFILL_METHOD] = slpnanfill_method
self.params[C.SLPF_CUTOFF] = slpfcutoff
wrap_spatio_temporal_filter(self.params)
# test_orb_errors_written
aps_error_files = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
assert all(p.exists() for p in aps_error_files)
last_mod_times = [os.stat(o).st_mtime for o in aps_error_files]
phase_prev = [i.phase_data for i in self.ifgs]
# run aps error removal again
wrap_spatio_temporal_filter(self.params)
aps_error_files2 = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
# if files are written again - times will change
last_mod_times_2 = [os.stat(o).st_mtime for o in aps_error_files2]
# test_aps_error_reused_if_params_unchanged
assert all(a == b for a, b in zip(last_mod_times, last_mod_times_2))
phase_now = [i.phase_data for i in self.ifgs]
# run aps error correction once mroe
wrap_spatio_temporal_filter(self.params)
aps_error_files3 = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
last_mod_times_3 = [os.stat(o).st_mtime for o in aps_error_files3]
assert all(a == b for a, b in zip(last_mod_times, last_mod_times_3))
phase_again = [i.phase_data for i in self.ifgs]
np.testing.assert_array_equal(phase_prev, phase_now)
np.testing.assert_array_equal(phase_prev, phase_again)