Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 9dcd810
  • /
  • test_pyrate.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:65c71c682a3275ddc546bc87407f12ea0162f44d
directory badge Iframe embedding
swh:1:dir:9dcd810517f5ee788fdc353347bb3e57774a54b2

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_pyrate.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains system integration tests for the PyRate workflow.
"""
import glob
import os
import shutil
import tempfile
import pytest
from os.path import join
from pathlib import Path
import numpy as np

import pyrate.configuration
import pyrate.core.shared
import pyrate.main
from pyrate.core import shared, config as cf, config, prepifg_helper, mst
from pyrate.core.shared import dem_or_ifg
from pyrate import correct, prepifg, conv2tif
from pyrate.configuration import MultiplePaths, Configuration
from tests import common

# taken from
# http://stackoverflow.com/questions/6260149/os-symlink-support-in-windows
if os.name == "nt":
    def symlink_ms(source, link_name):
        import ctypes
        csl = ctypes.windll.kernel32.CreateSymbolicLinkW
        csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
        csl.restype = ctypes.c_ubyte
        flags = 1 if os.path.isdir(source) else 0
        try:
            if csl(link_name, source.replace('/', '\\'), flags) == 0:
                raise ctypes.WinError()
        except:
            pass
    os.symlink = symlink_ms

CURRENT_DIR = os.getcwd()


def test_transform_params():
    params = {config.IFG_LKSX: 3, config.IFG_LKSY: 2, config.IFG_CROP_OPT: 1}
    assert cf.transform_params(params) == (3, 2, 1)


def test_warp_required():
    nocrop = prepifg_helper.ALREADY_SAME_SIZE
    assert shared.warp_required(xlooks=2, ylooks=1, crop=nocrop)
    assert shared.warp_required(xlooks=1, ylooks=2, crop=nocrop)
    assert shared.warp_required(xlooks=1, ylooks=1, crop=nocrop)
    assert not shared.warp_required(xlooks=1, ylooks=1, crop=None)

    for c in prepifg_helper.CROP_OPTIONS[:-1]:
        assert shared.warp_required(xlooks=1, ylooks=1, crop=c)


def test_original_ifg_paths():
    ifgdir = common.SML_TEST_TIF
    ifglist_path = join(ifgdir, 'ifms_17')
    paths = cf.original_ifg_paths(ifglist_path, ifgdir)
    assert paths[0] == join(ifgdir, 'geo_060619-061002_unw.tif'), str(paths[0])
    assert paths[-1] == join(ifgdir, 'geo_070709-070813_unw.tif')


def dest_ifg_paths(ifg_paths, outdir):
    """
    Returns paths to out/dest ifgs.
    """

    bases = [os.path.basename(p) for p in ifg_paths]
    return [join(outdir, p) for p in bases]


def test_dest_ifg_paths():
    # given source ifgs to process, get paths of ifgs in out dir
    src_paths = ['tif/ifg0.tif', 'tif/ifg1.tif']
    dest_paths = dest_ifg_paths(src_paths, outdir='out')
    assert dest_paths == [os.path.join('out', i) for i in ['ifg0.tif',
                                                           'ifg1.tif']]


# FIXME: change to read output ifgs
def get_ifgs(out_dir, _open=True):
    paths = glob.glob(join(out_dir, 'geo_*-*_unw.tif'))
    ifgs = [shared.Ifg(p) for p in paths]
    assert len(ifgs) == 17, 'Got %s' % ifgs

    if _open:
        for i in ifgs:
            i.open(readonly=False)
    return ifgs


class TestPyRate:
    # Initialise & run workflow from class setup, ignoring multilooking as it is
    # a separate step. Unit tests verify different steps have completed

    @classmethod
    def setup_class(cls):

        # testing constants2
        cls.BASE_DIR = tempfile.mkdtemp()
        cls.BASE_OUT_DIR = join(cls.BASE_DIR, 'out')
        cls.BASE_DEM_DIR = join(cls.BASE_DIR, 'dem')
        cls.BASE_DEM_FILE = join(cls.BASE_DEM_DIR, 'roipac_test_trimmed.tif')

        try:
            # copy source data (treat as prepifg already run)
            os.makedirs(cls.BASE_OUT_DIR)
            for path in glob.glob(join(common.SML_TEST_TIF, '*')):
                dest = join(cls.BASE_OUT_DIR, os.path.basename(path))
                shutil.copy(path, dest)
                os.chmod(dest, 0o660)

            os.makedirs(cls.BASE_DEM_DIR)
            orig_dem = common.SML_TEST_DEM_TIF
            os.symlink(orig_dem, cls.BASE_DEM_FILE)
            os.chdir(cls.BASE_DIR)

            # Turn off validation because we're in a different working dir
            #  and relative paths in config won't be work.
            params = config.get_config_params(common.TEST_CONF_ROIPAC)
            params['correct'] = ['orbfit', 'refphase', 'mst', 'apscorrect', 'maxvar']
            params[cf.OUT_DIR] = cls.BASE_OUT_DIR
            params[cf.PROCESSOR] = 0  # roipac
            params[cf.APS_CORRECTION] = 0
            paths = glob.glob(join(cls.BASE_OUT_DIR, 'geo_*-*.tif'))
            paths = sorted(paths)
            params[cf.PARALLEL] = False
            params[cf.ORBFIT_OFFSET] = True
            params[cf.TEMP_MLOOKED_DIR] = cls.BASE_OUT_DIR.join(cf.TEMP_MLOOKED_DIR)
            params[cf.INTERFEROGRAM_FILES] = [MultiplePaths(p, params) for p in paths]
            for p in params[cf.INTERFEROGRAM_FILES]:  # cheat
                p.sampled_path = p.converted_path
                p.tmp_sampled_path = p.converted_path
            params["rows"], params["cols"] = 2, 2
            params[cf.REF_PIXEL_FILE] = Configuration.ref_pixel_path(params)
            Path(params[cf.OUT_DIR]).joinpath(cf.APS_ERROR_DIR).mkdir(exist_ok=True, parents=True)
            Path(params[cf.OUT_DIR]).joinpath(cf.MST_DIR).mkdir(exist_ok=True, parents=True)
            correct.correct_ifgs(params)

            if not hasattr(cls, 'ifgs'):
                cls.ifgs = get_ifgs(out_dir=cls.BASE_OUT_DIR)
        except:
            # revert working dir & avoid paths busting other tests
            os.chdir(CURRENT_DIR)
            raise

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.BASE_DIR, ignore_errors=True)
        os.chdir(CURRENT_DIR)

    def key_check(self, ifg, key, value):
        'Helper to check for metadata flags'
        md = ifg.dataset.GetMetadata()
        assert key in md, 'Missing %s in %s' % (key, ifg.data_path)
        assert md[key] == value

    def test_basic_outputs(self):
        assert os.path.exists(self.BASE_OUT_DIR)

        for i in self.ifgs:
            assert ~i.is_read_only

        # log_path = self.get_logfile_path()
        # st = os.stat(log_path)
        # self.assertTrue(st.st_size > 0)

    def test_phase_conversion(self):
        # ensure phase has been converted from radians to millimetres
        key = 'DATA_UNITS'
        value = 'MILLIMETRES'

        for i in self.ifgs:
            self.key_check(i, key, value)

    def test_orbital_correction(self):
        key = 'ORBITAL_ERROR'
        value = 'REMOVED'

        for i in self.ifgs:
            self.key_check(i, key, value)


@pytest.mark.slow
class TestParallelPyRate:
    """
    parallel vs serial pyrate tests verifying results from all steps equal
    """

    @classmethod
    def setup_class(cls):
        gamma_conf = common.TEST_CONF_GAMMA
        from tests.common import manipulate_test_conf
        rate_types = ['stack_rate', 'stack_error', 'stack_samples']
        cls.tif_dir = Path(tempfile.mkdtemp())
        params = manipulate_test_conf(gamma_conf, cls.tif_dir)

        from pyrate.configuration import Configuration
        # change the required params
        # params[cf.OBS_DIR] = common.SML_TEST_GAMMA
        params[cf.PROCESSES] = 4
        params[cf.PROCESSOR] = 1  # gamma
        params[cf.IFG_FILE_LIST] = os.path.join(common.SML_TEST_GAMMA, 'ifms_17')
        params[cf.PARALLEL] = 1
        params[cf.APS_CORRECTION] = 0
        params[cf.REFX], params[cf.REFY] = -1, -1
        rows, cols = params["rows"], params["cols"]

        output_conf_file = 'gamma.conf'
        output_conf = cls.tif_dir.joinpath(output_conf_file).as_posix()
        pyrate.configuration.write_config_file(params=params, output_conf_file=output_conf)

        params = Configuration(output_conf).__dict__

        from subprocess import check_call
        check_call(f"pyrate conv2tif -f {output_conf}", shell=True)
        check_call(f"pyrate prepifg -f {output_conf}", shell=True)

        cls.sampled_paths = [p.tmp_sampled_path for p in params[cf.INTERFEROGRAM_FILES]]

        ifgs = common.small_data_setup()
        correct._copy_mlooked(params)
        tiles = pyrate.core.shared.get_tiles(cls.sampled_paths[0], rows, cols)
        correct.correct_ifgs(params)
        pyrate.main.timeseries(params)
        pyrate.main.stack(params)
        cls.refpixel_p, cls.maxvar_p, cls.vcmt_p = \
            (params[cf.REFX], params[cf.REFY]), params[cf.MAXVAR], params[cf.VCMT]
        cls.mst_p = common.reconstruct_mst(ifgs[0].shape, tiles, params[cf.OUT_DIR])
        cls.rate_p, cls.error_p, cls.samples_p = \
            [common.reconstruct_stack_rate(ifgs[0].shape, tiles, params[cf.TMPDIR], t) for t in rate_types]
        
        common.remove_tifs(params[cf.OBS_DIR])

        # now create the non parallel version
        cls.tif_dir_s = Path(tempfile.mkdtemp())
        params = manipulate_test_conf(gamma_conf, cls.tif_dir_s)
        params[cf.PROCESSES] = 4
        params[cf.PROCESSOR] = 1  # gamma
        params[cf.IFG_FILE_LIST] = os.path.join(common.SML_TEST_GAMMA, 'ifms_17')
        params[cf.PARALLEL] = 0
        params[cf.APS_CORRECTION] = 0
        params[cf.REFX], params[cf.REFY] = -1, -1
        output_conf_file = 'gamma.conf'
        output_conf = cls.tif_dir_s.joinpath(output_conf_file).as_posix()
        pyrate.configuration.write_config_file(params=params, output_conf_file=output_conf)
        params = Configuration(output_conf).__dict__

        check_call(f"pyrate conv2tif -f {output_conf}", shell=True)
        check_call(f"pyrate prepifg -f {output_conf}", shell=True)

        correct._copy_mlooked(params)
        correct.correct_ifgs(params)
        pyrate.main.timeseries(params)
        pyrate.main.stack(params)
        cls.refpixel, cls.maxvar, cls.vcmt = \
            (params[cf.REFX], params[cf.REFY]), params[cf.MAXVAR], params[cf.VCMT]
        cls.mst = common.reconstruct_mst(ifgs[0].shape, tiles, params[cf.OUT_DIR])
        cls.rate, cls.error, cls.samples = \
            [common.reconstruct_stack_rate(ifgs[0].shape, tiles, params[cf.TMPDIR], t) for t in rate_types]
        cls.params = params

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.params[cf.OUT_DIR])

    @pytest.mark.slow
    def test_orbital_correction(self):
        key = 'ORBITAL_ERROR'
        value = 'REMOVED'
        ifgs = [dem_or_ifg(i) for i in self.sampled_paths]
        for i in ifgs:
            i.open()
            i.nodata_value = 0
            self.key_check(i, key, value)

    def key_check(self, ifg, key, value):
        'Helper to check for metadata flags'
        md = ifg.dataset.GetMetadata()
        assert key in md, 'Missing %s in %s' % (key, ifg.data_path)
        assert md[key] == value

    @pytest.mark.slow
    def test_phase_conversion(self):
        # ensure phase has been converted from radians to millimetres
        key = 'DATA_UNITS'
        value = 'MILLIMETRES'
        ifgs = [dem_or_ifg(i) for i in self.sampled_paths]

        for i in ifgs:
            i.open()
            i.nodata_value = 0
            self.key_check(i, key, value)

    @pytest.mark.slow
    def test_mst_equal(self):
        np.testing.assert_array_equal(self.mst, self.mst_p)

    def test_refpixel_equal(self):
        np.testing.assert_array_equal(self.refpixel, self.refpixel_p)

    def test_maxvar_equal(self):
        np.testing.assert_array_almost_equal(self.maxvar, self.maxvar_p, decimal=4)

    def test_vcmt_equal(self):
        np.testing.assert_array_almost_equal(self.vcmt, self.vcmt_p, decimal=4)

    def test_stack_rate_equal(self):
        np.testing.assert_array_almost_equal(self.rate, self.rate_p, decimal=4)
        np.testing.assert_array_almost_equal(self.error, self.error_p, decimal=4)
        np.testing.assert_array_almost_equal(self.samples, self.samples_p, decimal=4)


class TestPrePrepareIfgs:

    @classmethod
    def setup_class(cls):
        params = config.get_config_params(common.TEST_CONF_ROIPAC)
        cls.tmp_dir = tempfile.mkdtemp()
        common.copytree(common.SML_TEST_TIF, cls.tmp_dir)
        tifs = glob.glob(os.path.join(cls.tmp_dir, "*.tif"))
        for t in tifs:
            os.chmod(t, 0o644)
        small_ifgs = common.small_data_setup(datafiles=tifs)
        ifg_paths = [i.data_path for i in small_ifgs]

        cls.ifg_ret = common.pre_prepare_ifgs(ifg_paths, params=params)
        for i in cls.ifg_ret:
            i.close()

        nan_conversion = params[cf.NAN_CONVERSION]

        # prepare a second set
        cls.tmp_dir2 = tempfile.mkdtemp()
        common.copytree(common.SML_TEST_TIF, cls.tmp_dir2)
        tifs = glob.glob(os.path.join(cls.tmp_dir2, "*.tif"))
        for t in tifs:
            os.chmod(t, 0o644)
        small_ifgs = common.small_data_setup(datafiles=tifs)
        ifg_paths = [i.data_path for i in small_ifgs]

        cls.ifgs = [shared.Ifg(p) for p in ifg_paths]

        for i in cls.ifgs:
            if not i.is_open:
                i.open(readonly=False)
            if nan_conversion:  # nan conversion happens here in networkx mst
                i.nodata_value = params[cf.NO_DATA_VALUE]
                i.convert_to_nans()
            if not i.mm_converted:
                i.convert_to_mm()
            i.close()

    @classmethod
    def teardown_class(cls):
        shutil.rmtree(cls.tmp_dir2)
        shutil.rmtree(cls.tmp_dir)

    @pytest.mark.slow
    def test_small_data_prep_phase_equality(self):
        for i, j in zip(self.ifgs, self.ifg_ret):
            np.testing.assert_array_almost_equal(i.phase_data, j.phase_data)
            assert ~(i.phase_data == 0).any()
            # if there was any 0 still present
            i.phase_data[4, 2] = 0
            assert (i.phase_data == 0).any()

    @pytest.mark.slow
    def test_small_data_prep_metadata_equality(self):
        for i, j in zip(self.ifgs, self.ifg_ret):
            assert i.meta_data == j.meta_data

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API