Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/GeoscienceAustralia/PyRate
09 August 2023, 08:52:18 UTC
  • Code
  • Branches (23)
  • Releases (1)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/CI-patch
    • refs/heads/data
    • refs/heads/dependabot/pip/joblib-1.2.0
    • refs/heads/dependabot/pip/numpy-1.22.0
    • refs/heads/dependabot/pip/scipy-1.10.0
    • refs/heads/develop
    • refs/heads/gh-pages
    • refs/heads/master
    • refs/heads/mg/actions
    • refs/heads/sb/largetifs-enhancements
    • refs/heads/sb/orbfit-independent-method
    • refs/heads/sb/orbital-correction-experiements
    • refs/heads/sb/phase-closure-correction
    • refs/heads/sb/upgrade-ci-ubuntu
    • refs/heads/sb/use-mpi-shared
    • refs/tags/0.3.0
    • refs/tags/0.4.0
    • refs/tags/0.4.1
    • refs/tags/0.4.2
    • refs/tags/0.4.3
    • refs/tags/0.5.0
    • refs/tags/0.6.0
    • refs/tags/0.6.1
    • 0.2.0
  • 94c4b86
  • /
  • tests
  • /
  • test_prepifg.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge Iframe embedding
swh:1:cnt:76e4062603b7d63bbb93580ac5111569d0e42610
origin badgedirectory badge Iframe embedding
swh:1:dir:77fc43e5fe61a5f09d099067a5d521c69aaf92ca
origin badgerevision badge
swh:1:rev:a1845a10c680eda4e34f93e8ab787ccfb367e546
origin badgesnapshot badge
swh:1:snp:e85aafb5fc900c1df2eebb773a8b8e11798084c1
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: a1845a10c680eda4e34f93e8ab787ccfb367e546 authored by Matt Garthwaite on 21 October 2021, 00:43:59 UTC
Merge pull request #366 from sixy6e/j6-actions
Tip revision: a1845a1
test_prepifg.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2021 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains tests for the prepifg.py PyRate module.
"""
import shutil
import os
from os.path import exists, join
import sys
import subprocess
import tempfile
from math import floor
from itertools import product
from pathlib import Path
import pytest

import numpy as np
from numpy import isnan, nanmax, nanmin, nanmean, ones, nan, reshape, sum as npsum
from numpy.testing import assert_array_almost_equal, assert_array_equal

from osgeo import gdal

import pyrate.configuration
import pyrate.constants as C
from pyrate.core.logger import pyratelogger as log
from pyrate.core.shared import Ifg, DEM, dem_or_ifg
from pyrate.core.shared import InputTypes
from pyrate.core.prepifg_helper import CUSTOM_CROP, MAXIMUM_CROP, MINIMUM_CROP, ALREADY_SAME_SIZE
from pyrate.core import roipac
from pyrate.core.prepifg_helper import prepare_ifg, _resample, PreprocessError, CustomExts
from pyrate.core.prepifg_helper import get_analysis_extent
from pyrate.core import ifgconstants as ifc
from pyrate.configuration import Configuration, MultiplePaths
from pyrate import conv2tif, prepifg

from tests import common
from tests.common import SML_TEST_LEGACY_PREPIFG_DIR, PY37GDAL302, BASE_TEST, WORKING_DIR
from tests.common import PREP_TEST_TIF, PREP_TEST_OBS, MEXICO_CROPA_CONF, assert_two_dirs_equal
from tests.common import SML_TEST_DEM_TIF, SML_TEST_DEM_HDR, manipulate_test_conf, UnitTestAdaptation

gdal.UseExceptions()
DUMMY_SECTION_NAME = 'pyrate'

if not exists(PREP_TEST_TIF):
    sys.exit("ERROR: Missing 'prepifg' dir for unittests\n")


def test_prepifg_treats_inputs_and_outputs_read_only(gamma_conf, tempdir, coh_mask):
    tdir = Path(tempdir())
    params = common.manipulate_test_conf(gamma_conf, tdir)
    params[C.COH_MASK] = coh_mask
    output_conf = tdir.joinpath('conf.cfg')
    pyrate.configuration.write_config_file(params=params, output_conf_file=output_conf)

    params = Configuration(output_conf.as_posix()).__dict__
    conv2tif.main(params)

    tifs = list(Path(params[C.INTERFEROGRAM_DIR]).glob('*_unw.tif'))
    assert len(tifs) == 17

    params = Configuration(output_conf.as_posix()).__dict__
    prepifg.main(params)
    cropped_ifgs = list(Path(params[C.INTERFEROGRAM_DIR]).glob('*_ifg.tif'))
    cropped_cohs = list(Path(params[C.COHERENCE_DIR]).glob('*_coh.tif'))
    cropped_dem = list(Path(params[C.GEOMETRY_DIR]).glob('*_dem.tif'))

    if params[C.COH_FILE_LIST] is not None:  # 17 + 1 dem + 17 coh files
        assert len(cropped_ifgs) + len(cropped_cohs) + len(cropped_dem) == 35
    else:  # 17 + 1 dem
        assert len(cropped_ifgs) + len(cropped_cohs) + len(cropped_dem) == 18
    # check all tifs from conv2tif are still readonly
    for t in tifs:
        assert t.stat().st_mode == 33060

    # check all prepifg outputs are readonly
    for c in cropped_cohs + cropped_ifgs + cropped_dem:
        assert c.stat().st_mode == 33060


def test_prepifg_file_types(tempdir, gamma_conf, coh_mask):
    tdir = Path(tempdir())
    params = manipulate_test_conf(gamma_conf, tdir)
    params[C.COH_MASK] = coh_mask
    params[C.PARALLEL] = 0
    output_conf_file = 'conf.conf'
    output_conf = tdir.joinpath(output_conf_file)
    pyrate.configuration.write_config_file(params=params, output_conf_file=output_conf)
    params_s = Configuration(output_conf).__dict__
    conv2tif.main(params_s)
    # reread params from config
    params_s = Configuration(output_conf).__dict__
    prepifg.main(params_s)
    ifg_files = list(Path(tdir.joinpath(params_s[C.INTERFEROGRAM_DIR])).glob('*_unw.tif'))
    assert len(ifg_files) == 17
    mlooked_files = list(Path(tdir.joinpath(params_s[C.INTERFEROGRAM_DIR])).glob('*_ifg.tif'))
    assert len(mlooked_files) == 17
    coh_files = list(Path(tdir.joinpath(params_s[C.COHERENCE_DIR])).glob('*_cc.tif'))
    mlooked_coh_files = list(Path(tdir.joinpath(params_s[C.COHERENCE_DIR])).glob('*_coh.tif'))
    if coh_mask:
        assert len(coh_files) == 17
        assert len(mlooked_coh_files) == 17
    dem_file = list(Path(tdir.joinpath(params_s[C.GEOMETRY_DIR])).glob('*_dem.tif'))[0]
    mlooked_dem_file = list(Path(tdir.joinpath(params_s[C.GEOMETRY_DIR])).glob('dem.tif'))[0]
    import itertools

    # assert coherence and ifgs have correct metadata
    for i in itertools.chain(*[ifg_files, mlooked_files, coh_files, mlooked_coh_files]):
        ifg = Ifg(i)
        ifg.open()
        md = ifg.meta_data
        if i.name.endswith('_unw.tif'):
            assert md[ifc.DATA_TYPE] == ifc.ORIG
            assert ifc.IFG_LKSX not in md
            assert ifc.IFG_LKSY not in md
            assert ifc.IFG_CROP not in md
            continue
        if i.name.endswith('_cc.tif'):
            assert md[ifc.DATA_TYPE] == ifc.COH
            assert ifc.IFG_LKSX not in md
            assert ifc.IFG_LKSY not in md
            assert ifc.IFG_CROP not in md
            continue
        if i.name.endswith('_coh.tif'):
            assert md[ifc.DATA_TYPE] == ifc.MULTILOOKED_COH
            assert md[ifc.IFG_LKSX] == '1'
            assert md[ifc.IFG_LKSY] == '1'
            assert md[ifc.IFG_CROP] == '1'
            continue
        if i.name.endswith('_ifg.tif'):
            if coh_mask:
                assert md[ifc.DATA_TYPE] == ifc.MLOOKED_COH_MASKED_IFG
                assert md[ifc.IFG_LKSX] == '1'
                assert md[ifc.IFG_LKSY] == '1'
                assert md[ifc.IFG_CROP] == '1'
            else:
                assert md[ifc.DATA_TYPE] == ifc.MULTILOOKED
                assert md[ifc.IFG_LKSX] == '1'
                assert md[ifc.IFG_LKSY] == '1'
                assert md[ifc.IFG_CROP] == '1'
            continue

    # assert dem has correct metadata
    dem = DEM(dem_file.as_posix())
    dem.open()
    md = dem.dataset.GetMetadata()
    assert md[ifc.DATA_TYPE] == ifc.DEM
    assert ifc.IFG_LKSX not in md
    assert ifc.IFG_LKSY not in md
    assert ifc.IFG_CROP not in md

    dem = DEM(mlooked_dem_file.as_posix())
    dem.open()
    md = dem.dataset.GetMetadata()
    assert md[ifc.DATA_TYPE] == ifc.MLOOKED_DEM
    assert ifc.IFG_LKSX in md
    assert ifc.IFG_LKSY in md
    assert ifc.IFG_CROP in md
    shutil.rmtree(tdir)


# convenience ifg creation funcs
def diff_exts_ifgs():
    """Returns pair of test Ifgs with different extents"""
    bases = ['geo_060619-061002_unw.tif', 'geo_070326-070917_unw.tif']
    headers = ['geo_060619-061002.unw.rsc', 'geo_070326-070917.unw.rsc']
    random_dir = tempfile.mkdtemp()
    for p, h in zip(bases, headers):
        shutil.copy(src=os.path.join(PREP_TEST_TIF, p),
                    dst=os.path.join(random_dir, p))
        shutil.copy(src=os.path.join(PREP_TEST_OBS, h),
                    dst=os.path.join(random_dir, h))
    return [Ifg(join(random_dir, p)) for p in bases], random_dir


def same_exts_ifgs():
    """Return pair of Ifgs with same extents"""
    return [Ifg(join(PREP_TEST_TIF, f)) for f in ('geo_060619-061002.tif', 'geo_070326-070917.tif')]


def extents_from_params(params):
    """Custom extents from supplied parameters"""
    keys = (
    C.IFG_XFIRST, C.IFG_YFIRST, C.IFG_XLAST, C.IFG_YLAST)
    return CustomExts(*[params[k] for k in keys])


def test_extents_from_params():
    xf, yf = 1.0, 2.0
    xl, yl = 5.0, 7.0
    pars = {C.IFG_XFIRST: xf, C.IFG_XLAST: xl,
            C.IFG_YFIRST: yf, C.IFG_YLAST: yl}

    assert extents_from_params(pars) == CustomExts(xf, yf, xl, yl)


class TestPrepifgOutput(UnitTestAdaptation):
    """Tests aspects of the prepifg.py script, such as resampling."""

    @staticmethod
    def assert_geotransform_equal(files):
        """
        Asserts geotransforms for the given files are equivalent. Files can be paths
        to datasets, or GDAL dataset objects.
        """
        assert len(files) > 1, "Need more than 1 file to compare"
        if not all([hasattr(f, "GetGeoTransform") for f in files]):
            datasets = [gdal.Open(f) for f in files]
            assert all(datasets)
        else:
            datasets = files

        transforms = [ds.GetGeoTransform() for ds in datasets]
        head = transforms[0]
        for t in transforms[1:]:
            assert_array_almost_equal(t, head, decimal=6, err_msg="Extents do not match!")

    @classmethod
    def setup_class(cls):
        cls.xs = 0.000833333
        cls.ys = -cls.xs
        cls.ifgs, cls.random_dir = diff_exts_ifgs()
        cls.ifg_paths = [i.data_path for i in cls.ifgs]

        cls.params = Configuration(common.TEST_CONF_ROIPAC).__dict__
        cls.params[C.OUT_DIR] = cls.random_dir
        cls.params[C.GEOMETRY_DIR] = Path(cls.random_dir).joinpath(C.GEOMETRY_DIR)
        cls.params[C.GEOMETRY_DIR].mkdir(exist_ok=True)
        cls.params[C.INTERFEROGRAM_DIR] = Path(cls.random_dir).joinpath(C.INTERFEROGRAM_DIR)
        cls.params[C.INTERFEROGRAM_DIR].mkdir(exist_ok=True)

        cls.headers = [roipac.roipac_header(i.data_path, cls.params) for i in cls.ifgs]
        paths = ["060619-061002_ifg.tif",
                 "060619-061002_ifg.tif",
                 "060619-061002_ifg.tif",
                 "060619-061002_ifg.tif",
                 "070326-070917_ifg.tif",
                 "070326-070917_ifg.tif",
                 "070326-070917_ifg.tif",
                 "070326-070917_ifg.tif"]
        cls.exp_files = [join(cls.random_dir, C.INTERFEROGRAM_DIR,  p) for p in paths]

    @staticmethod
    def test_mlooked_paths():
        test_mlooked_path()

    @staticmethod
    def test_extents_from_params():
        test_extents_from_params()

    @classmethod
    def teardown_class(cls):
        for exp_file in cls.exp_files:
            if exists(exp_file):
                os.remove(exp_file)
        for ifg in cls.ifgs:
            ifg.close()
        shutil.rmtree(cls.random_dir)

    def _custom_ext_latlons(self):
        return [150.91 + (7 * self.xs),  # xfirst
                -34.17 + (16 * self.ys),  # yfirst
                150.91 + (27 * self.xs),  # 20 cells from xfirst
                -34.17 + (44 * self.ys)]  # 28 cells from yfirst

    def _custom_extents_tuple(self):
        return CustomExts(*self._custom_ext_latlons())

    def assert_projection_equal(self, files):
        """
        Asserts preojections for the given files are equivalent.
        Files can be paths to datasets, or GDAL dataset objects.
        """
        assert len(files) > 1, "Need more than 1 file to compare"
        if not all([hasattr(f, "GetGeoTransform") for f in files]):
            datasets = [gdal.Open(f) for f in files]
            assert all(datasets)
        else:
            datasets = files

        projections = [ds.GetProjection() for ds in datasets]
        head = projections[0]
        for t in projections[1:]:
            self.assertEqual(t, head)

    def test_multilooked_projection_same_as_geotiff(self):
        xlooks = ylooks = 1
        exts = get_analysis_extent(crop_opt=MAXIMUM_CROP, rasters=self.ifgs, xlooks=xlooks, ylooks=ylooks,
                                   user_exts=None)
        out_dir = tempfile.mkdtemp()
        params = common.min_params(out_dir)
        params[C.IFG_LKSX] = xlooks
        params[C.IFG_LKSY] = ylooks
        params[C.IFG_CROP_OPT] = MAXIMUM_CROP
        params[C.GEOMETRY_DIR] = Path(out_dir).joinpath(C.GEOMETRY_DIR)
        params[C.GEOMETRY_DIR].mkdir(exist_ok=True)
        params[C.INTERFEROGRAM_DIR] = Path(out_dir).joinpath(C.INTERFEROGRAM_DIR)
        params[C.INTERFEROGRAM_DIR].mkdir(exist_ok=True)

        mlooked_paths = [mlooked_path(f, params, input_type=InputTypes.IFG)
                         for f in self.ifg_paths]
        for i, h, m in zip(self.ifg_paths, self.headers, mlooked_paths):
            prepare_ifg(i, xlooks, ylooks, exts, thresh=0.5, crop_opt=MAXIMUM_CROP, header=h, write_to_disk=True,
                        out_path=m)
        self.assert_projection_equal(self.ifg_paths + mlooked_paths)

    def test_default_max_extents(self):
        """Test ifgcropopt=2 crops datasets to max bounding box extents."""
        xlooks = ylooks = 1
        prepare_ifgs(self.ifg_paths, MAXIMUM_CROP, xlooks, ylooks, self.headers, params=self.params)
        for f in [self.exp_files[1], self.exp_files[5]]:
            self.assertTrue(exists(f), msg="Output files not created")

        # output files should have same extents
        # NB: also verifies gdalwarp correctly copies geotransform across
        ifg = Ifg(self.exp_files[1])
        ifg.open()
        gt = ifg.dataset.GetGeoTransform()

        # copied from gdalinfo output
        exp_gt = (150.91, 0.000833333, 0, -34.17, 0, -0.000833333)
        for i, j in zip(gt, exp_gt):
            self.assertAlmostEqual(i, j)

        self.assert_geotransform_equal([self.exp_files[1], self.exp_files[5]])

        ifg.close()
        for i in self.ifgs:
            i.close()

    def test_min_extents(self):
        """Test ifgcropopt=1 crops datasets to min extents."""
        xlooks = ylooks = 1
        prepare_ifgs(self.ifg_paths, MINIMUM_CROP, xlooks, ylooks, headers=self.headers, params=self.params)
        ifg = Ifg(self.exp_files[0])
        ifg.open()

        # output files should have same extents
        # NB: also verifies gdalwarp correctly copies geotransform across
        # NB: expected data copied from gdalinfo output
        gt = ifg.dataset.GetGeoTransform()
        exp_gt = (150.911666666, 0.000833333, 0, -34.172499999, 0, -0.000833333)
        for i, j in zip(gt, exp_gt):
            self.assertAlmostEqual(i, j)
        self.assert_geotransform_equal([self.exp_files[0], self.exp_files[4]])

        ifg.close()
        for i in self.ifgs:
            i.close()

    def test_custom_extents(self):
        xlooks = ylooks = 1
        cext = self._custom_extents_tuple()
        prepare_ifgs(self.ifg_paths, CUSTOM_CROP, xlooks, ylooks, headers=self.headers,
                     user_exts=cext, params=self.params)

        ifg = Ifg(self.exp_files[2])
        ifg.open()

        gt = ifg.dataset.GetGeoTransform()
        exp_gt = (cext.xfirst, self.xs, 0, cext.yfirst, 0, self.ys)

        for i, j in zip(gt, exp_gt):
            self.assertAlmostEqual(i, j)
        self.assert_geotransform_equal([self.exp_files[2], self.exp_files[6]])

        # close ifgs
        ifg.close()
        for i in self.ifgs:
            i.close()

    def test_exception_without_all_4_crop_parameters(self):
        """Test misaligned cropping extents raise errors."""
        xlooks = ylooks = 1
        # empty string and none raises exceptio
        for i in [None, '']:
            cext = (150.92, -34.18, 150.94, i)
            self.assertRaises(PreprocessError, prepare_ifgs, self.ifg_paths,
                              CUSTOM_CROP, xlooks, ylooks, self.headers, user_exts=cext, params=self.params)
        # three parameters provided
        self.assertRaises(PreprocessError, prepare_ifgs, self.ifg_paths,
                          CUSTOM_CROP, xlooks, ylooks, self.headers, params=self.params,
                          user_exts=(150.92, -34.18, 150.94))
        # close ifgs
        for i in self.ifgs:
            i.close()

    def test_custom_extents_misalignment(self):
        """Test misaligned cropping extents raise errors."""
        xlooks = ylooks = 1
        latlons = tuple(self._custom_ext_latlons())
        for i, _ in enumerate(['xfirst', 'yfirst', 'xlast', 'ylast']):
            # error = step / pi * [1000 100]
            for error in [0.265258, 0.026526]:
                tmp_latlon = list(latlons)
                tmp_latlon[i] += error
                cext = CustomExts(*tmp_latlon)

                self.assertRaises(PreprocessError, prepare_ifgs, self.ifg_paths, CUSTOM_CROP, xlooks, ylooks,
                                  user_exts=cext, headers=self.headers, params=self.params)
        # close ifgs
        for i in self.ifgs:
            i.close()

    def test_nodata(self):
        """Verify NODATA value copied correctly (amplitude band not copied)"""
        xlooks = ylooks = 1
        prepare_ifgs(self.ifg_paths, MINIMUM_CROP, xlooks, ylooks, self.headers, self.params)

        for ex in [self.exp_files[0], self.exp_files[4]]:
            ifg = Ifg(ex)
            ifg.open()
            # NB: amplitude band doesn't have a NODATA value
            self.assertTrue(isnan(ifg.dataset.GetRasterBand(1).GetNoDataValue()))
            ifg.close()
        for i in self.ifgs:
            i.close()

    def test_nans(self):
        """Verify NaNs replace 0 in the multilooked phase band"""
        xlooks = ylooks = 1
        prepare_ifgs(self.ifg_paths, MINIMUM_CROP, xlooks, ylooks, self.headers, self.params)

        for ex in [self.exp_files[0], self.exp_files[4]]:
            ifg = Ifg(ex)
            ifg.open()

            phase = ifg.phase_band.ReadAsArray()
            self.assertFalse((phase == 0).any())
            self.assertTrue((isnan(phase)).any())
            ifg.close()

        self.assertAlmostEqual(nanmax(phase), 4.247, 3)  # copied from gdalinfo
        self.assertAlmostEqual(nanmin(phase), 0.009, 3)  # copied from gdalinfo
        for i in self.ifgs:
            i.close()

    def test_multilook(self):
        """Test resampling method using a scaling factor of 4"""
        scale = 4  # assumes square cells
        self.ifgs.append(DEM(SML_TEST_DEM_TIF))
        self.ifg_paths = [i.data_path for i in self.ifgs]
        # append the dem header
        self.headers.append(SML_TEST_DEM_HDR)
        cext = self._custom_extents_tuple()
        xlooks = ylooks = scale
        prepare_ifgs(self.ifg_paths, CUSTOM_CROP, xlooks, ylooks, thresh=1.0, user_exts=cext,
                     headers=self.headers, params=self.params)

        for n, ipath in enumerate([self.exp_files[3], self.exp_files[7]]):
            i = Ifg(ipath)
            i.open()
            self.assertEqual(i.dataset.RasterXSize, 20 / scale)
            self.assertEqual(i.dataset.RasterYSize, 28 / scale)

            # verify resampling
            path = join(PREP_TEST_TIF, "geo_%s.tif" % Path(ipath).name.split('_ifg')[0])
            ds = gdal.Open(path)
            src_data = ds.GetRasterBand(2).ReadAsArray()
            exp_resample = multilooking(src_data, scale, scale, thresh=0)
            self.assertEqual(exp_resample.shape, (7, 5))
            assert_array_almost_equal(exp_resample, i.phase_band.ReadAsArray())
            ds = None
            i.close()
            os.remove(ipath)

        # verify DEM has been correctly processed
        # ignore output values as resampling has already been tested for phase
        exp_dem_path = join(self.params[C.GEOMETRY_DIR], 'dem.tif')
        self.assertTrue(exists(exp_dem_path))
        orignal_dem = DEM(SML_TEST_DEM_TIF)
        orignal_dem.open()
        dem_dtype = orignal_dem.dataset.GetRasterBand(1).DataType
        orignal_dem.close()
        dem = DEM(exp_dem_path)
        dem.open()

        # test multilooked dem is of the same datatype as the original dem tif
        self.assertEqual(dem_dtype, dem.dataset.GetRasterBand(1).DataType)

        self.assertEqual(dem.dataset.RasterXSize, 20 / scale)
        self.assertEqual(dem.dataset.RasterYSize, 28 / scale)
        data = dem.data
        self.assertTrue(data.ptp() != 0)
        # close ifgs
        dem.close()
        for i in self.ifgs:
            i.close()
        os.remove(exp_dem_path)

    def test_output_datatype(self):
        """Test resampling method using a scaling factor of 4"""
        scale = 4  # assumes square cells
        self.ifgs.append(DEM(SML_TEST_DEM_TIF))
        ifg_paths = [i.data_path for i in self.ifgs]
        data_types = [InputTypes.IFG] * len(self.ifg_paths)
        ifg_paths.append(SML_TEST_DEM_TIF)
        data_types.append(InputTypes.DEM)
        self.headers.append(SML_TEST_DEM_HDR)

        cext = self._custom_extents_tuple()
        xlooks = ylooks = scale
        prepare_ifgs(ifg_paths, CUSTOM_CROP, xlooks, ylooks, thresh=1.0, user_exts=cext, headers=self.headers,
                     params=self.params)
        self.params[C.IFG_LKSX] = xlooks
        self.params[C.IFG_LKSY] = ylooks
        self.params[C.IFG_CROP_OPT] = CUSTOM_CROP
        for i, t in zip(ifg_paths, data_types):
            mlooked_ifg = mlooked_path(i, self.params, input_type=t)
            ds1 = DEM(mlooked_ifg)
            ds1.open()
            ds2 = DEM(i)
            ds2.open()
            self.assertEqual(ds1.dataset.GetRasterBand(1).DataType,
                             ds2.dataset.GetRasterBand(1).DataType)
            ds1 = ds2 = None

    def test_invalid_looks(self):
        """Verify only numeric values can be given for multilooking"""
        values = [0, -1, -10, -100000.6, ""]
        for v in values:
            self.assertRaises(PreprocessError, prepare_ifgs, self.ifg_paths,
                              CUSTOM_CROP, xlooks=v, ylooks=1, headers=self.headers, params=self.params)

            self.assertRaises(PreprocessError, prepare_ifgs, self.ifg_paths,
                              CUSTOM_CROP, xlooks=1, ylooks=v, headers=self.headers, params=self.params)


class TestThresholdTests(UnitTestAdaptation):
    """Tests for threshold of data -> NaN during resampling."""

    def test_nan_threshold_inputs(self):
        data = ones((1, 1))
        for thresh in [-10, -1, -0.5, 1.000001, 10]:
            self.assertRaises(ValueError, _resample, data, 2, 2, thresh)

    @staticmethod
    def test_nan_threshold():
        # test threshold based on number of NaNs per averaging tile
        data = ones((2, 10))
        data[0, 3:] = nan
        data[1, 7:] = nan

        # key: NaN threshold as a % of pixels, expected result
        expected = [(0.0, [1, nan, nan, nan, nan]),
                    (0.25, [1, nan, nan, nan, nan]),
                    (0.5, [1, 1, nan, nan, nan]),
                    (0.75, [1, 1, 1, nan, nan]),
                    (1.0, [1, 1, 1, 1, nan])]

        for thresh, exp in expected:
            res = _resample(data, xscale=2, yscale=2, thresh=thresh)
            assert_array_equal(res, reshape(exp, res.shape))

    @staticmethod
    def test_nan_threshold_alt():
        # test threshold on odd numbers
        data = ones((3, 6))
        data[0] = nan
        data[1, 2:5] = nan

        expected = [(0.4, [nan, nan]), (0.5, [1, nan]), (0.7, [1, 1])]
        for thresh, exp in expected:
            res = _resample(data, xscale=3, yscale=3, thresh=thresh)
            assert_array_equal(res, reshape(exp, res.shape))


class TestSameSizeTests(UnitTestAdaptation):
    """Tests aspects of the prepifg.py script, such as resampling."""

    @classmethod
    def setup_class(cls):
        import datetime
        cls.xs = 0.000833333
        cls.ys = -cls.xs
        cls.headers = [
            {'NCOLS': 47, 'NROWS': 72, 'LAT': -34.17, 'LONG': 150.91, 'X_STEP': 0.000833333, 'Y_STEP': -0.000833333,
             'WAVELENGTH_METRES': 0.0562356424, 'FIRST_DATE': datetime.date(2007, 3, 26),
             'SECOND_DATE': datetime.date(2007, 9, 17), 'TIME_SPAN_YEAR': 0.4791238877481177,
             'DATA_UNITS': 'RADIANS', 'INSAR_PROCESSOR': 'ROIPAC', 'X_LAST': 150.94916665099998,
             'Y_LAST': -34.229999976, 'DATUM': 'WGS84', 'DATA_TYPE': 'ORIGINAL_IFG'},
            {'NCOLS': 47, 'NROWS': 72, 'LAT': -34.17, 'LONG': 150.91, 'X_STEP': 0.000833333, 'Y_STEP': -0.000833333,
             'WAVELENGTH_METRES': 0.0562356424, 'FIRST_DATE': datetime.date(2007, 3, 26),
             'SECOND_DATE': datetime.date(2007, 9, 17), 'TIME_SPAN_YEAR': 0.4791238877481177,
             'DATA_UNITS': 'RADIANS', 'INSAR_PROCESSOR': 'ROIPAC', 'X_LAST': 150.94916665099998,
             'Y_LAST': -34.229999976, 'DATUM': 'WGS84', 'DATA_TYPE': 'ORIGINAL_IFG'}
        ]
        out_dir = tempfile.mkdtemp()
        cls.params = common.min_params(out_dir)
        cls.params[C.GEOMETRY_DIR] = Path(cls.params[C.OUT_DIR]).joinpath(C.GEOMETRY_DIR)
        cls.params[C.GEOMETRY_DIR].mkdir(exist_ok=True)
        cls.params[C.INTERFEROGRAM_DIR] = Path(cls.params[C.OUT_DIR]).joinpath(C.INTERFEROGRAM_DIR)
        cls.params[C.INTERFEROGRAM_DIR].mkdir(exist_ok=True)


    # TODO: check output files for same extents?
    # TODO: make prepifg dir readonly to test output to temp dir
    # TODO: move to class for testing same size option?
    def test_already_same_size(self):
        # should do nothing as layers are same size & no multilooking required
        ifgs = same_exts_ifgs()
        ifg_data_paths = [d.data_path for d in ifgs]
        res_tup = prepare_ifgs(ifg_data_paths, ALREADY_SAME_SIZE, 1, 1, self.headers, self.params)
        res = [r[1] for r in res_tup]
        self.assertTrue(all(res))

    def test_already_same_size_mismatch(self):
        ifgs, random_dir = diff_exts_ifgs()
        ifg_data_paths = [d.data_path for d in ifgs]
        self.assertRaises(PreprocessError, prepare_ifgs, ifg_data_paths, ALREADY_SAME_SIZE, 1, 1, self.headers,
                          self.params)
        for i in ifgs:
            i.close()
        shutil.rmtree(random_dir)

    # TODO: ensure multilooked files written to output dir
    def test_same_size_multilooking(self):
        ifgs = same_exts_ifgs()
        ifg_data_paths = [d.data_path for d in ifgs]
        xlooks = ylooks = 2
        out_dir = tempfile.mkdtemp()
        params = common.min_params(out_dir)
        params[C.IFG_LKSX] = xlooks
        params[C.IFG_LKSY] = ylooks
        params[C.IFG_CROP_OPT] = ALREADY_SAME_SIZE
        params[C.GEOMETRY_DIR] = Path(params[C.OUT_DIR]).joinpath(C.GEOMETRY_DIR)
        params[C.GEOMETRY_DIR].mkdir(exist_ok=True)
        params[C.INTERFEROGRAM_DIR] = Path(params[C.OUT_DIR]).joinpath(C.INTERFEROGRAM_DIR)
        params[C.INTERFEROGRAM_DIR].mkdir(exist_ok=True)

        prepare_ifgs(ifg_data_paths, ALREADY_SAME_SIZE, xlooks, ylooks, self.headers, params)
        looks_paths = [mlooked_path(d, params, input_type=InputTypes.IFG) for d in ifg_data_paths]
        mlooked = [Ifg(i) for i in looks_paths]
        for m in mlooked:
            m.open()
        self.assertEqual(len(mlooked), 2)

        for ifg in mlooked:
            self.assertAlmostEqual(ifg.x_step, xlooks * self.xs)
            self.assertAlmostEqual(ifg.x_step, ylooks * self.xs)
            os.remove(ifg.data_path)


def mlooked_path(path, params, input_type):
    m = MultiplePaths(path, params=params, input_type=input_type)
    return m.sampled_path


def test_mlooked_path():
    path = 'geo_060619-061002_unw.tif'
    for xlks, ylks, cr, input_type in product([2, 4, 8], [4, 2, 5], [1, 2, 3, 4], [InputTypes.IFG, InputTypes.COH]):
        out_dir = tempfile.mkdtemp()
        params = common.min_params(out_dir)
        params[C.IFG_LKSX] = xlks
        params[C.IFG_LKSY] = ylks
        params[C.IFG_CROP_OPT] = cr
        m = mlooked_path(path, params, input_type=input_type)
        assert Path(m).name == f'060619-061002_{input_type.value}.tif'


# class LineOfSightTests(unittest.TestCase):
# def test_los_conversion(self):
# TODO: needs LOS matrix
# TODO: this needs to work from config and incidence files on disk
# TODO: is convflag (see 'ifgconv' setting) used or just defaulted?
# TODO: los conversion has 4 options: 1: ignore, 2: vertical, 3: N/S, 4: E/W
# also have a 5th option of arbitrary azimuth angle (PyRate doesn't have this)
#    params = _default_extents_param()
#    params[IFG_CROP_OPT] = MINIMUM_CROP
#    params[PROJECTION_FLAG] = None
#    prepare_ifgs(params)


# def test_phase_conversion(self):
# TODO: check output data is converted to mm from radians (in prepifg??)
# raise NotImplementedError


class TestLocalMultilookTests:
    """Tests for local testing functions"""

    @staticmethod
    def test_multilooking_thresh():
        data = ones((3, 6))
        data[0] = nan
        data[1, 2:5] = nan
        expected = [(6, [nan, nan]),
                    (5, [1, nan]),
                    (4, [1, 1])]
        scale = 3
        for thresh, exp in expected:
            res = multilooking(data, scale, scale, thresh)
            assert_array_equal(res, reshape(exp, res.shape))


def multilooking(src, xscale, yscale, thresh=0):
    """
    src: numpy array of phase data
    thresh: min number of non-NaNs required for a valid tile resampling
    """
    thresh = int(thresh)
    num_cells = xscale * yscale
    if thresh > num_cells or thresh < 0:
        msg = "Invalid threshold: %s (need 0 <= thr <= %s" % (thresh, num_cells)
        raise ValueError(msg)

    rows, cols = src.shape
    rows_lowres = int(floor(rows / yscale))
    cols_lowres = int(floor(cols / xscale))
    dest = ones((rows_lowres, cols_lowres)) * nan

    size = xscale * yscale
    for row in range(rows_lowres):
        for col in range(cols_lowres):
            ys = row * yscale
            ye = ys + yscale
            xs = col * xscale
            xe = xs + xscale

            patch = src[ys:ye, xs:xe]
            num_values = num_cells - npsum(isnan(patch))

            if num_values >= thresh and num_values > 0:
                # nanmean() only works on 1g axis
                reshaped = patch.reshape(size)
                dest[row, col] = nanmean(reshaped)

    return dest


class TestLegacyEqualityTestRoipacSmallTestData(UnitTestAdaptation):
    """
    Legacy roipac prepifg equality test for small test data
    """

    def setup_class(cls):
        from tests.common import small_data_setup
        cls.ifgs = small_data_setup()
        cls.ifg_paths = [i.data_path for i in cls.ifgs]
        params = Configuration(common.TEST_CONF_ROIPAC).__dict__
        cls.headers = [roipac.roipac_header(i.data_path, params) for i in cls.ifgs]
        params[C.IFG_LKSX], params[C.IFG_LKSY], params[
            C.IFG_CROP_OPT] = 1, 1, 1
        prepare_ifgs(cls.ifg_paths, crop_opt=1, xlooks=1, ylooks=1, headers=cls.headers, params=params)
        looks_paths = [mlooked_path(d, params, t) for d, t in zip(cls.ifg_paths, [InputTypes.IFG]*len(cls.ifgs))]
        cls.ifgs_with_nan = [Ifg(i) for i in looks_paths]
        for ifg in cls.ifgs_with_nan:
            ifg.open()

    @classmethod
    def teardown_class(cls):
        for i in cls.ifgs_with_nan:
            if os.path.exists(i.data_path):
                i.close()
                os.remove(i.data_path)

    def test_legacy_prepifg_equality_array(self):
        """
        Legacy prepifg equality test
        """
        # path to csv folders from legacy output
        onlyfiles = [
            fln for fln in os.listdir(SML_TEST_LEGACY_PREPIFG_DIR)
            if os.path.isfile(os.path.join(SML_TEST_LEGACY_PREPIFG_DIR, fln))
            and fln.endswith('.csv') and fln.__contains__('_rad_')
            ]

        for fln in onlyfiles:
            ifg_data = np.genfromtxt(os.path.join(
                SML_TEST_LEGACY_PREPIFG_DIR, fln), delimiter=',')
            for k, j in enumerate(self.ifgs):
                if fln.split('_rad_')[-1].split('.')[0] == \
                        os.path.split(j.data_path)[-1].split('.')[0]:
                    np.testing.assert_array_almost_equal(ifg_data,
                                                         self.ifgs_with_nan[
                                                             k].phase_data,
                                                         decimal=2)

    def test_legacy_prepifg_and_convert_phase(self):
        """
        Legacy data prepifg equality test
        """
        # path to csv folders from legacy output
        for i in self.ifgs_with_nan:
            if not i.mm_converted:
                i.convert_to_mm()
        onlyfiles = [
            f for f in os.listdir(SML_TEST_LEGACY_PREPIFG_DIR)
            if os.path.isfile(os.path.join(SML_TEST_LEGACY_PREPIFG_DIR, f))
            and f.endswith('.csv') and f.__contains__('_mm_')]

        count = 0
        for i, f in enumerate(onlyfiles):
            ifg_data = np.genfromtxt(os.path.join(
                SML_TEST_LEGACY_PREPIFG_DIR, f), delimiter=',')
            for k, j in enumerate(self.ifgs):
                if f.split('_mm_')[-1].split('.')[0] == \
                        os.path.split(j.data_path)[-1].split('_unw.')[0]:
                    count += 1
                    # all numbers equal
                    np.testing.assert_array_almost_equal(
                        ifg_data, self.ifgs_with_nan[k].phase_data, decimal=2)

                    # means must also be equal
                    self.assertAlmostEqual(
                        nanmean(ifg_data),
                        nanmean(self.ifgs_with_nan[k].phase_data),
                        places=4)

                    # number of nans must equal
                    self.assertEqual(
                        np.sum(np.isnan(ifg_data)),
                        np.sum(np.isnan(self.ifgs_with_nan[k].phase_data)))

        # ensure we have the correct number of matches
        self.assertEqual(count, len(self.ifgs))


class TestOneIncidenceOrElevationMap(UnitTestAdaptation):

    @classmethod
    def setup_class(cls):
        cls.base_dir = tempfile.mkdtemp()
        cls.conf_file = tempfile.mktemp(suffix='.conf', dir=cls.base_dir)
        cls.ifgListFile = os.path.join(common.GAMMA_SML_TEST_DIR, 'ifms_17')
        cls.baseListFile = os.path.join(common.GAMMA_SML_TEST_DIR, 'baseline_17')

    @classmethod
    def teardown_class(cls):
        params = Configuration(cls.conf_file).__dict__
        shutil.rmtree(cls.base_dir)
        common.remove_tifs(params[WORKING_DIR])

    def make_input_files(self, inc='', ele=''):
        with open(self.conf_file, 'w') as conf:
            conf.write('{}: {}\n'.format(C.NO_DATA_VALUE, '0.0'))
            conf.write('{}: {}\n'.format(WORKING_DIR, common.GAMMA_SML_TEST_DIR))
            conf.write('{}: {}\n'.format(C.OUT_DIR, self.base_dir))
            conf.write('{}: {}\n'.format(C.IFG_FILE_LIST, self.ifgListFile))
            conf.write('{}: {}\n'.format(C.BASE_FILE_LIST, self.baseListFile))
            conf.write('{}: {}\n'.format(C.PROCESSOR, '1'))
            conf.write('{}: {}\n'.format(
                C.DEM_HEADER_FILE, os.path.join(
                    common.GAMMA_SML_TEST_DIR, '20060619_utm_dem.par')))
            conf.write('{}: {}\n'.format(C.IFG_LKSX, '1'))
            conf.write('{}: {}\n'.format(C.IFG_LKSY, '1'))
            conf.write('{}: {}\n'.format(C.IFG_CROP_OPT, '1'))
            conf.write('{}: {}\n'.format(C.NO_DATA_AVERAGING_THRESHOLD, '0.5'))
            conf.write('{}: {}\n'.format(C.HDR_FILE_LIST,
                                         common.SML_TEST_GAMMA_HEADER_LIST))
            conf.write('{}: {}\n'.format(C.DEM_FILE, common.SML_TEST_DEM_GAMMA))
            conf.write('{}: {}\n'.format(C.APS_INCIDENCE_MAP, inc))
            conf.write('{}: {}\n'.format(C.APS_ELEVATION_MAP, ele))
            conf.write('{}: {}\n'.format(C.APS_CORRECTION, '1'))
            conf.write('{}: {}\n'.format(C.APS_METHOD, '2'))
            conf.write('{}: {}\n'.format(C.TIME_SERIES_SM_ORDER, 1))

    def common_check(self, ele, inc):
        import glob
        from pyrate.configuration import Configuration
        assert os.path.exists(self.conf_file)

        params = Configuration(self.conf_file).__dict__

        conv2tif.main(params)
        sys.argv = ['dummy', self.conf_file]
        prepifg.main(params)
        # test 17 geotiffs created
        geotifs = glob.glob(os.path.join(params[C.OUT_DIR], '*_unw.tif'))
        self.assertEqual(17, len(geotifs))
        # test dem geotiff created
        demtif = glob.glob(os.path.join(params[C.OUT_DIR], '*_dem.tif'))
        self.assertEqual(1, len(demtif))
        # mlooked tifs
        mlooked_tifs = glob.glob(os.path.join(self.base_dir, '*_ifg.tif'))
        mlooked_tifs.append(os.path.join(self.base_dir, 'dem.tif'))
        # 19 including 17 ifgs, 1 dem and one incidence
        self.assertEqual(18, len(mlooked_tifs))
        inc = glob.glob(os.path.join(self.base_dir, '*utm_{inc}.tif'.format(inc=inc)))
        self.assertEqual(0, len(inc))


def prepare_ifgs(raster_data_paths, crop_opt, xlooks, ylooks, headers, params, thresh=0.5, user_exts=None,
                 write_to_disc=True):
    """
    Wrapper function to prepare a sequence of interferogram files for
    PyRate analysis. See prepifg.prepare_ifg() for full description of
    inputs and returns.

    Note: function need refining for crop options

    :param list raster_data_paths: List of interferogram file paths
    :param int crop_opt: Crop option
    :param int xlooks: Number of multi-looks in x; 5 is 5 times smaller, 1 is no change
    :param int ylooks: Number of multi-looks in y
    :param float thresh: see thresh in prepare_ifgs()
    :param tuple user_exts: Tuple of user defined georeferenced extents for
        new file: (xfirst, yfirst, xlast, ylast)cropping coordinates
    :param bool write_to_disc: Write new data to disk

    :return: resampled_data: output cropped and resampled image
    :rtype: ndarray
    :return: out_ds: destination gdal dataset object
    :rtype: List[gdal.Dataset]
    """
    if xlooks != ylooks:
        log.warning('X and Y multi-look factors are not equal')

    # use metadata check to check whether it's a dem or ifg
    rasters = [dem_or_ifg(r) for r in raster_data_paths]
    exts = get_analysis_extent(crop_opt, rasters, xlooks, ylooks, user_exts)
    out_paths = []
    for r, t in zip(raster_data_paths, rasters):
        if isinstance(t, DEM):
            input_type = InputTypes.DEM
        else:
            input_type = InputTypes.IFG
        out_path = MultiplePaths(r, params, input_type).sampled_path
        out_paths.append(out_path)
    return [prepare_ifg(d, xlooks, ylooks, exts, thresh, crop_opt, h, write_to_disc, p) for d, h, p
            in zip(raster_data_paths, headers, out_paths)]


@pytest.mark.mpi
@pytest.mark.slow
@pytest.mark.skipif(not PY37GDAL302, reason="Only run in one CI env")
def test_coh_stats_equality(mexico_cropa_params):
    subprocess.run(f"mpirun -n 3 pyrate prepifg -f {MEXICO_CROPA_CONF.as_posix()}", shell=True)
    params = mexico_cropa_params
    mexico_cropa_coh_stats_dir = Path(BASE_TEST).joinpath("cropA", 'coherence_stats')
    assert_two_dirs_equal(params[C.COHERENCE_DIR], mexico_cropa_coh_stats_dir, ext="coh*.tif", num_files=3)

    # assert metadata was written
    from pyrate.prepifg import out_type_md_dict
    for stat_tif in mexico_cropa_coh_stats_dir.glob("coh*.tif"):
        ds = gdal.Open(stat_tif.as_posix())
        md = ds.GetMetadata()
        expected_md = out_type_md_dict[stat_tif.stem.upper()]
        assert ifc.DATA_TYPE in md.keys()
        assert expected_md in md.values()

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API