https://github.com/GeoscienceAustralia/PyRate
Tip revision: 3fd3e706522d0f4675eb006a9e9a141931668675 authored by Sudipta Basak on 17 June 2021, 05:56:54 UTC
WIP orbital correction tests
WIP orbital correction tests
Tip revision: 3fd3e70
test_stackrate.py
# This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# coding: utf-8
"""
This Python module contains tests for the stack.py PyRate module.
"""
import os
import shutil
from numpy import eye, array, ones, nan
import numpy as np
from numpy.testing import assert_array_almost_equal, assert_array_equal
import pyrate.constants as C
import pyrate.core.orbital
import pyrate.core.prepifg_helper
import pyrate.core.ref_phs_est
import pyrate.core.refpixel
import tests.common
from pyrate.core import covariance as vcm_module
from pyrate.core.stack import stack_rate_pixel, mask_rate
from pyrate import correct, prepifg, conv2tif
from pyrate.configuration import Configuration
from tests import common
from tests.common import SML_TEST_DIR, prepare_ifgs_without_phase, pre_prepare_ifgs
def default_params():
return {'pthr': 3, 'nsig': 3, 'maxsig': 2, 'parallel': 1, 'processes': 8}
class SinglePixelIfg(object):
def __init__(self, timespan, phase):
self.time_span = timespan
self.phase_data = array([[phase]])
class TestStackRatePixel:
"""
Tests the weighted least squares algorithm for determining
the best fitting velocity
"""
def setup_method(self):
self.phase = array([0.5, 3.5, 4, 2.5, 3.5, 1])
self.timespan = array([[0.1, 0.7, 0.8, 0.5, 0.7, 0.2]])
self.vcmt = eye(6, 6)
self.mst = ones((6, 1, 1))
self.mst[4] = 0
self.params = default_params()
def test_stack_rate_pixel(self):
# Simple test with one pixel and equal weighting
exprate = array([[5.0]])
experr = array([[0.836242010007091]])
expsamp = array([[5]])
rate, error, samples = stack_rate_pixel(self.phase, self.mst, self.vcmt,
self.timespan, self.params['nsig'], self.params['pthr'])
assert_array_almost_equal(rate, exprate)
assert_array_almost_equal(error, experr)
assert_array_almost_equal(samples, expsamp)
class TestMaskRate:
"""
Test the maxsig threshold masking algorithm
"""
def setup_method(self):
self.r = array([5.0, 4.5]) # rates for 2 pixels
self.e = array([1.1, 2.1]) # errors for 2 pixels
def test_mask_rate_maxsig1(self):
# both rate and error values masked
rate, error = mask_rate(self.r, self.e, 1)
assert_array_equal(rate, array([nan, nan]))
assert_array_equal(error, array([nan, nan]))
def test_mask_rate_maxsig2(self):
# one rate and one error masked
rate, error = mask_rate(self.r, self.e, 2)
assert_array_equal(rate, array([5.0, nan]))
assert_array_equal(error, array([1.1, nan]))
def test_mask_rate_maxsig3(self):
# No values masked in rate or error
rate, error = mask_rate(self.r, self.e, 3)
assert_array_equal(rate, self.r)
assert_array_equal(error, self.e)
class TestLegacyEquality:
"""
Tests equality with legacy data
"""
@classmethod
def setup_class(cls):
params = Configuration(common.TEST_CONF_ROIPAC).__dict__
params[C.TEMP_MLOOKED_DIR] = os.path.join(params[C.OUT_DIR], C.TEMP_MLOOKED_DIR)
# force error maps to 1-sigma to match legacy
params[C.VELERROR_NSIG] = 1
conv2tif.main(params)
prepifg.main(params)
params[C.REF_EST_METHOD] = 2
xlks, _, crop = pyrate.core.prepifg_helper.transform_params(params)
dest_paths, headers = common.repair_params_for_correct_tests(params[C.INTERFEROGRAM_DIR], params)
correct._copy_mlooked(params)
copied_dest_paths = [os.path.join(params[C.TEMP_MLOOKED_DIR], os.path.basename(d)) for d in dest_paths]
del dest_paths
# start run_pyrate copy
ifgs = pre_prepare_ifgs(copied_dest_paths, params)
mst_grid = tests.common.mst_calculation(copied_dest_paths, params)
refx, refy = pyrate.core.refpixel.ref_pixel_calc_wrapper(params)
params[C.REFX] = refx
params[C.REFY] = refy
params[C.ORBFIT_OFFSET] = True
# Estimate and remove orbit errors
pyrate.core.orbital.remove_orbital_error(ifgs, params)
ifgs = prepare_ifgs_without_phase(copied_dest_paths, params)
for ifg in ifgs:
ifg.close()
correct._update_params_with_tiles(params)
_, ifgs = pyrate.core.ref_phs_est.ref_phase_est_wrapper(params)
ifgs[0].open()
r_dist = vcm_module.RDist(ifgs[0])()
ifgs[0].close()
maxvar = [vcm_module.cvd(i, params, r_dist)[0] for i in copied_dest_paths]
for ifg in ifgs:
ifg.open()
vcmt = vcm_module.get_vcmt(ifgs, maxvar)
for ifg in ifgs:
ifg.close()
ifg.open()
# Calculate stacked rate map
params[C.PARALLEL] = 1
cls.rate, cls.error, cls.samples = tests.common.calculate_stack_rate(ifgs, params, vcmt, mst_mat=mst_grid)
# Calculate stacked rate map
params[C.PARALLEL] = 0
cls.rate_s, cls.error_s, cls.samples_s = tests.common.calculate_stack_rate(ifgs, params, vcmt, mst_mat=mst_grid)
stackrate_dir = os.path.join(SML_TEST_DIR, 'stackrate')
cls.rate_container = np.genfromtxt(os.path.join(stackrate_dir, 'stackmap.csv'), delimiter=',')
cls.error_container = np.genfromtxt(os.path.join(stackrate_dir, 'errormap.csv'), delimiter=',')
cls.samples_container = np.genfromtxt(os.path.join(stackrate_dir, 'coh_sta.csv'), delimiter=',')
for ifg in ifgs:
ifg.close()
cls.params = params
@classmethod
def teardown_class(cls):
shutil.rmtree(cls.params[C.OUT_DIR])
def test_stack_rate_full_parallel(self):
"""
python multiprocessing by rows vs serial
"""
assert_array_almost_equal(self.rate, self.rate_s, decimal=3)
def test_stackrate_error_parallel(self):
"""
python multiprocessing by rows vs serial
"""
assert_array_almost_equal(self.error, self.error_s, decimal=3)
def test_stackrate_samples_parallel(self):
"""
python multiprocessing by rows vs serial
"""
assert_array_almost_equal(self.samples, self.samples_s, decimal=3)
def test_stack_rate(self):
"""
Compare with legacy data
"""
assert_array_almost_equal(self.rate_s, self.rate_container, decimal=3)
def test_stackrate_error(self):
"""
Compare with legacy data
"""
assert_array_almost_equal(self.error_s, self.error_container, decimal=3)
def test_stackrate_samples(self):
"""
Compare with legacy data
"""
assert_array_almost_equal(self.samples_s, self.samples_container, decimal=3)