Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • cbdbb26
  • /
  • test_aps.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:029b71189dec5817fd98518f6f2e9ce413846cb5
directory badge Iframe embedding
swh:1:dir:cbdbb26ae74738c98efa42d41469138b9a20b5ce

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_aps.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains tests for the APS PyRate module.
"""
import os
import shutil
import pytest
import numpy as np
from os.path import join
from scipy.ndimage import gaussian_filter1d, gaussian_filter
from numpy.testing import assert_array_almost_equal

import pyrate.constants as C
from pyrate import conv2tif, prepifg, correct
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core import shared
from pyrate.core.aps import wrap_spatio_temporal_filter, _interpolate_nans_2d, _kernel
from pyrate.core.aps import gaussian_temporal_filter as tlpfilter, gaussian_spatial_filter as slpfilter
from pyrate.core.shared import Ifg
from pyrate.core.ifgconstants import DAYS_PER_YEAR
from tests import common
from tests.common import BASE_TEST, PY37GDAL304


@pytest.fixture(params=["linear", "nearest", "cubic"])
def slpnanfill_method(request):
    return request.param


@pytest.fixture(params=[0.1, 0.5, 1, 5])
def slpfcutoff_method(request):
    return request.param


@pytest.fixture(params=[0.001, 0.01])
def slpfcutoff(request):
    return request.param


def test_interpolate_nans_2d(slpnanfill_method):
    arr = np.random.rand(200, 100)
    arr[arr < 0.1] = np.nan  # insert some nans
    assert np.sum(np.isnan(arr)) != 0  # some nans present
    _interpolate_nans_2d(arr, method=slpnanfill_method)
    assert np.sum(np.isnan(arr)) == 0  # should not be any nans


class TestSpatialFilter:
    """
    Test the implementation of Gaussian spatial filter
    """

    def setup_method(self):
        ifg_path = join(str(BASE_TEST), 'cropB', '20180106-20180130_ifg.tif')
        ifg = Ifg(ifg_path)
        ifg.open()
        p = ifg.phase_data
        self.x_size = ifg.x_size
        self.y_size = ifg.y_size
        # convert zeros to NaNs
        p[p == 0] = np.nan
        self.phase = p

    def test_gaussian_filter(self, slpfcutoff_method):
        """
        Compare against scipy.ndimage.gaussian_filter,
        that operates in the spatial domain on data with equal resolution in x and y
        """
        e = gaussian_filter(self.phase, sigma=slpfcutoff_method)
        r = slpfilter(self.phase, cutoff=slpfcutoff_method,
                      x_size=self.x_size, y_size=self.y_size)
        # pull data from middle of images; away from potential edge effects
        exp = e[50:150, 50:150]
        res = r[50:150, 50:150]
        assert_array_almost_equal(res, exp, 0)


def test_gaussian_kernel():
    """
    Test the Gaussian smoothing kernel
    """
    x = np.arange(1, 10, 2)
    res = _kernel(x, 3)
    exp = np.array([0.94595947, 0.60653066, 0.24935221, 0.06572853, 0.011109])
    np.testing.assert_array_almost_equal(res, exp, decimal=6)

    res = _kernel(x, 4)
    exp = np.array([0.96923323, 0.7548396, 0.45783336, 0.21626517, 0.07955951])
    np.testing.assert_array_almost_equal(res, exp, decimal=6)

    res = _kernel(x, 0.001)
    exp = np.array([0, 0, 0, 0, 0])
    np.testing.assert_array_equal(res, exp)


class TestTemporalFilter:
    """
    Tests for the temporal filter with synthetic data for a single pixel
    """

    def setup_method(self):
        self.thr = 1  # no nans in these test cases, threshold = 1
        # instance of normally distributed noise
        n = np.array([-0.36427456, 0.69539061, 0.42181139, -2.56306134,
                      0.55844095, -0.65562626, 0.65607911, 1.19431637,
                      -1.43837395, -0.91656358])
        # synthetic incremental displacement
        d = np.array([1., 1., 0.7, 0.3, 0., 0.1, 0.2, 0.6, 1., 1.])
        # incremental displacement + noise
        self.tsincr = d * 2 + n
        # regular time series, every 12 days
        self.interval = 12 / DAYS_PER_YEAR  # 0.03285 years
        intv = np.ones(d.shape, dtype=np.float32) * self.interval
        self.span = np.cumsum(intv)

    def test_tlpfilter_repeatability(self):
        """
        TEST 1: check for repeatability against expected result from
        tlpfilter. Cutoff equal to sampling interval (sigma=1)
        """
        res = tlpfilter(self.tsincr, self.interval, self.span, self.thr)
        exp = np.array([1.9936507, 1.9208364, 1.0252733, -0.07402889,
                        -0.1842336, 0.24325351, 0.94737214, 1.3890865,
                        1.1903466, 1.0036403])
        np.testing.assert_array_almost_equal(res, exp, decimal=6)

    def test_tlpfilter_scipy_sig1(self):
        """
        TEST 2: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
        regularly sampled data. Cutoff equal to sampling interval (sigma=1)
        """
        res = tlpfilter(self.tsincr, self.interval, self.span, self.thr)
        exp = gaussian_filter1d(self.tsincr, sigma=1)
        np.testing.assert_array_almost_equal(res, exp, decimal=1)

    def test_tlpfilter_scipy_sig2(self):
        """
        TEST 3: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
        regularly sampled data. Cutoff equal to twice the sampling interval (sigma=2)
        """
        res = tlpfilter(self.tsincr, self.interval * 2, self.span, self.thr)
        exp = gaussian_filter1d(self.tsincr, sigma=2)
        np.testing.assert_array_almost_equal(res, exp, decimal=1)

    def test_tlpfilter_scipy_sig05(self):
        """
        TEST 4: compare tlpfilter to scipy.ndimage.gaussian_filter1d. Works for
        regularly sampled data. Cutoff equal to half the sampling interval (sigma=0.5)
        """
        res = tlpfilter(self.tsincr, self.interval * 0.5, self.span, self.thr)
        exp = gaussian_filter1d(self.tsincr, sigma=0.5)
        np.testing.assert_array_almost_equal(res, exp, decimal=2)


@pytest.mark.slow
@pytest.mark.skipif(not PY37GDAL304, reason="Only run in one CI env")
class TestAPSErrorCorrectionsOnDiscReused:

    @classmethod
    def setup_method(cls):
        cls.conf = common.TEST_CONF_GAMMA
        params = Configuration(cls.conf).__dict__
        conv2tif.main(params)
        params = Configuration(cls.conf).__dict__
        prepifg.main(params)
        cls.params = Configuration(cls.conf).__dict__
        correct._copy_mlooked(cls.params)
        correct._update_params_with_tiles(cls.params)
        correct._create_ifg_dict(cls.params)
        multi_paths = cls.params[C.INTERFEROGRAM_FILES]
        cls.ifg_paths = [p.tmp_sampled_path for p in multi_paths]
        cls.ifgs = [shared.Ifg(i) for i in cls.ifg_paths]
        for i in cls.ifgs:
            i.open()
        shared.save_numpy_phase(cls.ifg_paths, cls.params)
        correct.mst_calc_wrapper(cls.params)

    @classmethod
    def teardown_method(cls):
        shutil.rmtree(cls.params[C.OUT_DIR])

    def test_aps_error_files_on_disc(self, slpnanfill_method, slpfcutoff):
        self.params[C.SLPF_NANFILL_METHOD] = slpnanfill_method
        self.params[C.SLPF_CUTOFF] = slpfcutoff
        wrap_spatio_temporal_filter(self.params)

        # test_orb_errors_written
        aps_error_files = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
        assert all(p.exists() for p in aps_error_files)
        last_mod_times = [os.stat(o).st_mtime for o in aps_error_files]
        phase_prev = [i.phase_data for i in self.ifgs]

        # run aps error removal again
        wrap_spatio_temporal_filter(self.params)
        aps_error_files2 = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
        # if files are written again - times will change
        last_mod_times_2 = [os.stat(o).st_mtime for o in aps_error_files2]
        # test_aps_error_reused_if_params_unchanged
        assert all(a == b for a, b in zip(last_mod_times, last_mod_times_2))
        phase_now = [i.phase_data for i in self.ifgs]

        # run aps error correction once mroe
        wrap_spatio_temporal_filter(self.params)
        aps_error_files3 = [MultiplePaths.aps_error_path(i, self.params) for i in self.ifg_paths]
        last_mod_times_3 = [os.stat(o).st_mtime for o in aps_error_files3]
        assert all(a == b for a, b in zip(last_mod_times, last_mod_times_3))
        phase_again = [i.phase_data for i in self.ifgs]
        np.testing.assert_array_equal(phase_prev, phase_now)
        np.testing.assert_array_equal(phase_prev, phase_again)

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API