Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • cbdbb26
  • /
  • test_timeseries.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:08b895872be2043b8642ad2032ab219de07d4951
directory badge Iframe embedding
swh:1:dir:cbdbb26ae74738c98efa42d41469138b9a20b5ce

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_timeseries.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
# coding: utf-8
"""
This Python module contains tests for the timeseries.py PyRate module.
"""
import os
import shutil
from copy import deepcopy
import pytest
from datetime import date, timedelta
from numpy import nan, asarray, where, array
import numpy as np
from numpy.testing import assert_array_almost_equal

import pyrate.constants as C
import pyrate.core.orbital
import pyrate.core.prepifg_helper
import pyrate.core.ref_phs_est
import pyrate.core.refpixel
import tests.common as common
from pyrate.core import mst, covariance
from pyrate import correct, prepifg, conv2tif
from pyrate.configuration import Configuration
from pyrate.core.timeseries import time_series, linear_rate_pixel, linear_rate_array, TimeSeriesError


def default_params():
    return {C.TIME_SERIES_METHOD: 1,
            C.TIME_SERIES_PTHRESH: 0,
            C.TIME_SERIES_SM_ORDER: 2,
            C.TIME_SERIES_SM_FACTOR: -0.25,
            C.PARALLEL: 0,
            C.PROCESSES: 1,
            C.NAN_CONVERSION: 1,
            C.NO_DATA_VALUE: 0}


class SinglePixelIfg(object):
    """
    A single pixel ifg (interferogram) solely for unit testing
    """

    def __init__(self, first, second, phase, nan_fraction):
        self.phase_data = asarray([[phase]])
        self.first = first
        self.second = second
        self.nrows = 1
        self.ncols = 1
        self.nan_fraction = asarray([nan_fraction])

    def convert_to_nans(self, val=0):
        """
        Converts given values in phase data to NaNs
        val - value to convert, default is 0
        """
        self.phase_data = where(self.phase_data == val, nan, self.phase_data)
        self.nan_converted = True


class TestTimeSeries:
    """Verifies error checking capabilities of the time_series function"""

    @classmethod
    def setup_class(cls):
        cls.ifgs = common.small_data_setup()
        cls.params = default_params()
        cls.mstmat = mst.mst_boolean_array(cls.ifgs)
        r_dist = covariance.RDist(cls.ifgs[0])()
        cls.maxvar = [covariance.cvd(i, cls.params, r_dist)[0]
                      for i in cls.ifgs]
        cls.vcmt = covariance.get_vcmt(cls.ifgs, cls.maxvar)

    def test_time_series_unit(self):
        """
        Checks that the code works the same as the calculated example
        """
        ifirst = asarray([1, 1, 2, 2, 3, 3, 4, 5])
        isecond = asarray([2, 4, 3, 4, 5, 6, 6, 6])
        timeseries = asarray([0.0, 0.1, 0.6, 0.8, 1.1, 1.3])
        phase = asarray([0.5, 4, 2.5, 3.5, 2.5, 3.5, 2.5, 1])
        nan_fraction = asarray([0.5, 0.4, 0.2, 0.3, 0.1, 0.3, 0.2, 0.1])

        now = date.today()

        dates = [now + timedelta(days=(t*365.25)) for t in timeseries]
        dates.sort()
        first = [dates[m_num - 1] for m_num in ifirst]
        second = [dates[s_num - 1] for s_num in isecond]

        self.ifgs = [SinglePixelIfg(m, s, p, n) for m, s, p, n in
                     zip(first, second, phase, nan_fraction)]

        tsincr, tscum, tsvel = time_series(
            self.ifgs, params=self.params, vcmt=self.vcmt, mst=None)
        expected = asarray([[[0.50, 3.0, 4.0, 5.5, 6.5]]])
        assert_array_almost_equal(tscum, expected, decimal=2)


class TestLegacyTimeSeriesEquality:

    @classmethod
    def setup_class(cls):
        params = Configuration(common.TEST_CONF_ROIPAC).__dict__
        params[C.TEMP_MLOOKED_DIR] = os.path.join(params[C.OUT_DIR],
                                                                 C.TEMP_MLOOKED_DIR)
        conv2tif.main(params)
        prepifg.main(params)

        params[C.REF_EST_METHOD] = 2

        xlks, _, crop = pyrate.core.prepifg_helper.transform_params(params)

        dest_paths, headers = common.repair_params_for_correct_tests(params[C.INTERFEROGRAM_DIR], params)
        correct._copy_mlooked(params)
        copied_dest_paths = [os.path.join(params[C.TEMP_MLOOKED_DIR], os.path.basename(d)) for d in dest_paths]
        del dest_paths
        # start run_pyrate copy
        ifgs = common.pre_prepare_ifgs(copied_dest_paths, params)
        mst_grid = common.mst_calculation(copied_dest_paths, params)
        refx, refy = pyrate.core.refpixel.ref_pixel_calc_wrapper(params)

        params[C.REFX] = refx
        params[C.REFY] = refy
        params[C.ORBFIT_OFFSET] = True
        # Estimate and remove orbit errors
        pyrate.core.orbital.remove_orbital_error(ifgs, params)
        ifgs = common.prepare_ifgs_without_phase(copied_dest_paths, params)
        for ifg in ifgs:
            ifg.close()
        correct._update_params_with_tiles(params)
        _, ifgs = pyrate.core.ref_phs_est.ref_phase_est_wrapper(params)
        ifgs[0].open()
        r_dist = covariance.RDist(ifgs[0])()
        ifgs[0].close()
        maxvar = [covariance.cvd(i, params, r_dist)[0] for i in copied_dest_paths]
        for ifg in ifgs:
            ifg.open()
        vcmt = covariance.get_vcmt(ifgs, maxvar)

        for ifg in ifgs:
            ifg.close()
            ifg.open()
            ifg.nodata_value = 0.0

        params[C.TIME_SERIES_METHOD] = 1
        params[C.PARALLEL] = 0
        # Calculate time series
        cls.tsincr_0, cls.tscum_0, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)

        params[C.PARALLEL] = 1
        cls.tsincr_1, cls.tscum_1, cls.tsvel_1 = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)

        # load the legacy data
        ts_dir = os.path.join(common.SML_TEST_DIR, 'time_series')
        tsincr_path = os.path.join(ts_dir, 'ts_incr_interp0_method1.csv')
        ts_incr = np.genfromtxt(tsincr_path)

        tscum_path = os.path.join(ts_dir, 'ts_cum_interp0_method1.csv')
        ts_cum = np.genfromtxt(tscum_path)
        cls.ts_incr = np.reshape(ts_incr, newshape=cls.tsincr_0.shape, order='F')
        cls.ts_cum = np.reshape(ts_cum, newshape=cls.tscum_0.shape, order='F')
        cls.params = params

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.params[C.OUT_DIR])

    def test_time_series_equality_parallel_by_rows(self):
        """
        check time series parallel by rows jobs
        """

        self.assertEqual(self.tsincr_1.shape, self.tscum_1.shape)
        self.assertEqual(self.tsvel_1.shape, self.tsincr_1.shape)

        np.testing.assert_array_almost_equal(
            self.ts_incr, self.tsincr_1, decimal=3)

        np.testing.assert_array_almost_equal(
            self.ts_cum, self.tscum_1, decimal=3)

    def test_time_series_equality_serial_by_the_pixel(self):
        """
        check time series
        """

        self.assertEqual(self.tsincr_0.shape, self.tscum_0.shape)

        np.testing.assert_array_almost_equal(
            self.ts_incr, self.tsincr_0, decimal=3)

        np.testing.assert_array_almost_equal(
            self.ts_cum, self.tscum_0, decimal=3)

    @staticmethod
    def assertEqual(val1, val2):
        assert val1 == val2


class TestLegacyTimeSeriesEqualityMethod2Interp0:

    @classmethod
    def setup_class(cls):
        params = Configuration(common.TEST_CONF_ROIPAC).__dict__
        params[C.TEMP_MLOOKED_DIR] = os.path.join(params[C.OUT_DIR],
                                                                 C.TEMP_MLOOKED_DIR)
        conv2tif.main(params)
        prepifg.main(params)

        params[C.REF_EST_METHOD] = 2

        xlks, _, crop = pyrate.core.prepifg_helper.transform_params(params)

        dest_paths, headers = common.repair_params_for_correct_tests(params[C.INTERFEROGRAM_DIR], params)
        correct._copy_mlooked(params)
        copied_dest_paths = [os.path.join(params[C.TEMP_MLOOKED_DIR], os.path.basename(d)) for d in dest_paths]
        del dest_paths
        # start run_pyrate copy
        ifgs = common.pre_prepare_ifgs(copied_dest_paths, params)
        mst_grid = common.mst_calculation(copied_dest_paths, params)
        refx, refy = pyrate.core.refpixel.ref_pixel_calc_wrapper(params)

        params[C.REFX] = refx
        params[C.REFY] = refy
        params[C.ORBFIT_OFFSET] = True

        # Estimate and remove orbit errors
        pyrate.core.orbital.remove_orbital_error(ifgs, params)
        ifgs = common.prepare_ifgs_without_phase(copied_dest_paths, params)
        for ifg in ifgs:
            ifg.close()

        correct._update_params_with_tiles(params)
        _, ifgs = pyrate.core.ref_phs_est.ref_phase_est_wrapper(params)
        ifgs[0].open()
        r_dist = covariance.RDist(ifgs[0])()
        ifgs[0].close()
        # Calculate interferogram noise
        maxvar = [covariance.cvd(i, params, r_dist)[0] for i in copied_dest_paths]
        for ifg in ifgs:
            ifg.open()
        vcmt = covariance.get_vcmt(ifgs, maxvar)
        for ifg in ifgs:
            ifg.close()
            ifg.open()
            ifg.nodata_value = 0.0

        params[C.TIME_SERIES_METHOD] = 2
        params[C.PARALLEL] = 1
        # Calculate time series
        cls.tsincr, cls.tscum, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)

        params[C.PARALLEL] = 0
        # Calculate time series serailly by the pixel
        cls.tsincr_0, cls.tscum_0, _ = common.calculate_time_series(ifgs, params, vcmt, mst=mst_grid)

        # copy legacy data
        SML_TIME_SERIES_DIR = os.path.join(common.SML_TEST_DIR, 'time_series')
        tsincr_path = os.path.join(SML_TIME_SERIES_DIR, 'ts_incr_interp0_method2.csv')
        ts_incr = np.genfromtxt(tsincr_path)

        tscum_path = os.path.join(SML_TIME_SERIES_DIR, 'ts_cum_interp0_method2.csv')
        ts_cum = np.genfromtxt(tscum_path)

        cls.ts_incr = np.reshape(ts_incr, newshape=cls.tsincr_0.shape, order='F')
        cls.ts_cum = np.reshape(ts_cum, newshape=cls.tscum_0.shape, order='F')
        cls.params = params

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.params[C.OUT_DIR])

    def test_time_series_equality_parallel_by_rows(self):

        assert self.tsincr.shape == self.tscum.shape

        np.testing.assert_array_almost_equal(self.ts_incr, self.tsincr, decimal=1)

        np.testing.assert_array_almost_equal(self.ts_cum, self.tscum, decimal=1)

    def test_time_series_equality_serial_by_the_pixel(self):

        assert self.tsincr_0.shape == self.tscum_0.shape

        np.testing.assert_array_almost_equal(self.ts_incr, self.tsincr_0, decimal=3)

        np.testing.assert_array_almost_equal(self.ts_cum, self.tscum_0, decimal=3)


class TestLinearRatePixel:
    """
    Tests the linear regression algorithm for determining the best
    fitting velocity from a cumulative time series
    """

    def test_linear_rate_pixel_clean(self):
        y = array([0, 2, 4, 6, 8, 10])
        t = array([0, 1, 2, 3, 4, 5])
        exp = (2.0, 0.0, 1.0, 0.0, 6)
        res = linear_rate_pixel(y, t)
        assert res == exp

    def test_linear_rate_pixel_neg_rate(self):
        y = array([0, -2, -4, -6, -8, -10])
        t = array([0, 1, 2, 3, 4, 5])
        exp = (-2.0, 0.0, 1.0, 0.0, 6)
        res = linear_rate_pixel(y, t)
        assert res == exp

    def test_linear_rate_pixel_outlier(self):
        y = array([0, 2, 4, 6, 8, 20])
        t = array([0, 1, 2, 3, 4, 5])
        exp = (3.428571, -1.904761, 0.812030, 0.824786, 6)
        res = linear_rate_pixel(y, t)
        assert res == pytest.approx(exp, rel=1e-6)

    def test_linear_rate_pixel_noise(self):
        y = array([0, 2, 4, 6, 8, 10])
        r = y + np.random.rand(6) # add different uniform noise each time
        t = array([0, 1, 2, 3, 4, 5])
        exprate = 2.0
        explsqd = 1.0
        experr = 0.0
        rate, _, lsqd, err, _ = linear_rate_pixel(y, t)
        assert exprate == pytest.approx(rate, rel=1e-1)
        assert explsqd == pytest.approx(lsqd, rel=1e-1)
        assert experr == pytest.approx(err, rel=1e-1)

    def test_linear_rate_pixel_exception(self):
        # input vectors should be equal length
        y = array([2, 4, 6, 8, 10])
        t = array([0, 1, 2, 3, 4, 5])
        with pytest.raises(TimeSeriesError):
            res = linear_rate_pixel(y, t)

    def test_linear_rate_pixel_nans(self):
        # at least two obs are required for line fitting
        y = array([0, nan, nan, nan, nan, nan])
        t = array([0, 1, 2, 3, 4, 5])
        exp = (nan, nan, nan, nan, nan)
        res = linear_rate_pixel(y, t)
        assert res == exp


class TestLinearRateArray:
    """
    Tests the array loop wrapper for the linear regression algorithm using real data
    """

    @classmethod
    @pytest.fixture(autouse=True)
    def setup_class(cls, roipac_params):
        cls.params = roipac_params
        cls.ifgs = common.small_data_setup()

        # read in input (tscuml) and expected output arrays
        tscuml_path = os.path.join(common.SML_TEST_LINRATE, "tscuml_0.npy")
        cls.tscuml0 = np.load(tscuml_path)
        # add zero epoch to tscuml 3D array
        cls.tscuml = np.insert(cls.tscuml0, 0, 0, axis=2) 

        linrate_path = os.path.join(common.SML_TEST_LINRATE, "linear_rate.npy")
        cls.linrate = np.load(linrate_path)

        error_path = os.path.join(common.SML_TEST_LINRATE, "linear_error.npy")
        cls.error = np.load(error_path)

        icpt_path = os.path.join(common.SML_TEST_LINRATE, "linear_intercept.npy")
        cls.icpt = np.load(icpt_path)

        samp_path = os.path.join(common.SML_TEST_LINRATE, "linear_samples.npy")
        cls.samp = np.load(samp_path)

        rsq_path = os.path.join(common.SML_TEST_LINRATE, "linear_rsquared.npy")
        cls.rsq = np.load(rsq_path)

    def test_linear_rate_array(self):
        """
        Input and expected output are on disk. This test only tests the linear_rate_array
        and linear_rate_pixel functions using real data.
        """
        l, i, r, e, s = linear_rate_array(self.tscuml, self.ifgs, self.params)
        # test to 20 decimal places
        assert_array_almost_equal(self.linrate, l, 1e-20)
        assert_array_almost_equal(self.icpt, i, 1e-20)
        assert_array_almost_equal(self.rsq, r, 1e-20)
        assert_array_almost_equal(self.error, e, 1e-20)
        assert_array_almost_equal(self.samp, s, 1e-20)

    def test_linear_rate_array_two_sigma(self):
        """
        Check that the "nsigma" switch in the config dictionary
        actually results in a change in the error map.
        """
        # make a deep copy of the params dict to avoid changing
        # state for other tests if this one fails
        params = deepcopy(self.params)
        params[C.VELERROR_NSIG] = 2
        _, _, _, e, _ = linear_rate_array(self.tscuml, self.ifgs, params)
        assert_array_almost_equal(self.error*2, e, 1e-20)

    def test_linear_rate_array_exception(self):
        # depth of tscuml should equal nepochs
        with pytest.raises(TimeSeriesError):
            res = linear_rate_array(self.tscuml0, self.ifgs, self.params)

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API