Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • cbdbb26
  • /
  • test_shared.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:85f994c86b3a55b09ed1ac8c4178b3f569e36140
directory badge Iframe embedding
swh:1:dir:cbdbb26ae74738c98efa42d41469138b9a20b5ce

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_shared.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2021 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains tests for the shared.py PyRate module.
"""
import os
import shutil
import sys
import random
import string
import tempfile
import pytest
from pathlib import Path
from itertools import product
from numpy import isnan, where, nan
from os.path import join, basename, exists
from stat import S_IRGRP, S_IWGRP, S_IWOTH, S_IROTH, S_IRUSR, S_IWUSR

import numpy as np
from numpy.testing import assert_array_equal
from osgeo import gdal
from osgeo.gdal import Open, Dataset, UseExceptions

import pyrate.constants as C
import tests.common
from tests.common import SML_TEST_TIF, SML_TEST_DEM_TIF, TEMPDIR, WORKING_DIR
from pyrate.core import shared, ifgconstants as ifc, gamma
from pyrate.core.shared import dem_or_ifg
from pyrate.core import mpiops
from pyrate import prepifg, conv2tif
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core.shared import Ifg, DEM, RasterException
from pyrate.core.shared import cell_size, _utm_zone, tiles_split

from tests import common

UseExceptions()

if not exists(SML_TEST_TIF):
    sys.exit("ERROR: Missing small_test data for unit tests\n")


class TestIfgTests:
    """Unit tests for the Ifg/interferogram class."""

    def setup_class(cls):
        cls.ifg = Ifg(join(SML_TEST_TIF, 'geo_060619-061002_unw.tif'))
        cls.ifg.open()
        cls.ifg.nodata_value = 0

    def test_headers_as_attr(self):
        for a in ['ncols', 'nrows', 'x_first', 'x_step',
                  'y_first', 'y_step', 'wavelength', 'first', 'second']:
            assert getattr(self.ifg, a) is not None

    def test_convert_to_nans(self):
        self.ifg.convert_to_nans()
        assert self.ifg.nan_converted

    def test_xylast(self):
        # ensure the X|Y_LAST header element has been created
        assert self.ifg.x_last == pytest.approx(150.9491667)
        assert self.ifg.y_last == pytest.approx(-34.23)

    def test_num_cells(self):
        # test cell size from header elements
        data = self.ifg.phase_band.ReadAsArray()
        ys, xs = data.shape
        exp_ncells = ys * xs
        assert exp_ncells == self.ifg.num_cells

    def test_shape(self):
        assert self.ifg.shape == self.ifg.phase_data.shape

    def test_nan_count(self):
        num_nan = 0
        for row in self.ifg.phase_data:
            for v in row:
                if isnan(v):
                    num_nan += 1
        if self.ifg.nan_converted:
            assert num_nan == self.ifg.nan_count
        else:
            assert num_nan == 0

    def test_phase_band(self):
        data = self.ifg.phase_band.ReadAsArray()
        assert data.shape == (72, 47)

    def test_nan_fraction(self):
        # NB: source data lacks 0 -> NaN conversion
        data = self.ifg.phase_data
        data = where(data == 0, nan, data) # fake 0 -> nan for the count below

        # manually count # nan cells
        nans = 0
        ys, xs = data.shape
        for y, x in product(range(ys), range(xs)):
            if isnan(data[y, x]):
                nans += 1
        del data

        num_cells = float(ys * xs)
        assert nans > 0
        assert nans <= num_cells
        assert nans / num_cells == self.ifg.nan_fraction

    def test_xy_size(self):
        assert ~ (self.ifg.ncols is None)
        assert ~ (self.ifg.nrows is None)

        # test with tolerance from base 90m cell
        # within 2% of cells over small?
        assert self.ifg.y_size > 88.0
        assert self.ifg.y_size < 92.0, 'Got %s' % self.ifg.y_size

        width = 76.9 # from nearby PyRate coords
        assert self.ifg.x_size > 0.97 * width  # ~3% tolerance
        assert  self.ifg.x_size < 1.03 * width

    def test_centre_latlong(self):
        lat_exp = self.ifg.y_first + \
                  (int(self.ifg.nrows / 2) * self.ifg.y_step)
        long_exp = self.ifg.x_first + \
                   (int(self.ifg.ncols / 2) * self.ifg.x_step)
        assert lat_exp == self.ifg.lat_centre
        assert long_exp == self.ifg.long_centre

    def test_centre_cell(self):
        assert self.ifg.x_centre == 23
        assert self.ifg.y_centre == 36

    def test_time_span(self):
        assert self.ifg.time_span == pytest.approx(0.287474332649)

    def test_wavelength(self):
        assert self.ifg.wavelength == pytest.approx(0.0562356424)


class TestIfgIOTests:

    def setup_method(self):
        self.ifg = Ifg(join(SML_TEST_TIF, 'geo_070709-070813_unw.tif'))
        self.header = join(common.ROIPAC_SML_TEST_DIR, 'geo_070709-070813_unw.rsc')

    def test_open(self):
        assert self.ifg.dataset is None
        assert self.ifg.is_open is False
        self.ifg.open(readonly=True)
        assert self.ifg.dataset is not None
        assert self.ifg.is_open is True
        assert isinstance(self.ifg.dataset, Dataset)

        # ensure open cannot be called twice
        with pytest.raises(RasterException):
            self.ifg.open(True)

    def test_open_ifg_from_dataset(self):
        """
        Test showing open() can not be used for Ifg created with
        gdal.Dataset object as Dataset has already been read in
        """
        self.ifg.open()
        dataset = self.ifg.dataset
        new_ifg = Ifg(dataset)
        with pytest.raises(RasterException):
            new_ifg.open()

    def test_write(self):
        base = TEMPDIR
        src = self.ifg.data_path
        dest = join(base, basename(self.ifg.data_path))

        # shutil.copy needs to copy writeable permission from src
        os.chmod(src, S_IRGRP | S_IWGRP | S_IWOTH | S_IROTH |
                 S_IRUSR | S_IWUSR)
        shutil.copy(src, dest)
        os.chmod(src, S_IRGRP | S_IROTH | S_IRUSR)  # revert

        i = Ifg(dest)
        i.open()
        i.phase_data[0, 1:] = nan
        i.write_modified_phase()
        del i

        # reopen to ensure data/nans can be read back out
        i = Ifg(dest)
        i.open(readonly=True)
        assert_array_equal(True, isnan(i.phase_data[0, 1:]))
        i.close()
        os.remove(dest)

    def test_write_fails_on_readonly(self):
        # check readonly status is same before
        # and after open() for readonly file
        assert self.ifg.is_read_only
        self.ifg.open(readonly=True)
        assert self.ifg.is_read_only
        with pytest.raises(IOError):
            self.ifg.write_modified_phase()

    def test_phase_band_unopened_ifg(self):
        try:
            _ = self.ifg.phase_band
            self.fail("Should not be able to access band without open dataset")
        except RasterException:
            pass

    def test_nan_fraction_unopened(self):
        try:
            # NB: self.assertRaises doesn't work here (as it is a property?)
            _ = self.ifg.nan_fraction
            self.fail("Shouldn't be able to "
                      "call nan_fraction() with unopened Ifg")
        except RasterException:
            pass

    def test_phase_data_properties(self):
        # Use raw GDAL to isolate raster reading from Ifg functionality
        ds = Open(self.ifg.data_path)
        data = ds.GetRasterBand(1).ReadAsArray()
        del ds

        self.ifg.open()

        # test full array and row by row access
        assert_array_equal(data, self.ifg.phase_data)
        for y, row in enumerate(self.ifg.phase_rows):
            assert_array_equal(data[y], row)

        # test the data is cached if changed
        crd = (5, 4)
        orig = self.ifg.phase_data[crd]
        self.ifg.phase_data[crd] *= 2
        nv = self.ifg.phase_data[crd]  # pull new value out again
        assert nv == 2 * orig


# FIXME:
# class IncidenceFileTests():
#     'Unit tests to verify operations on GeoTIFF format Incidence rasters'
#
#     def setUp(self):
#         raise NotImplementedError
#         self.inc = Incidence(join(INCID_TEST_DIR, '128x2.tif'))
#         self.inc.open()
#
#
#     def test_incidence_data(self):
#         # check incidences rises while traversing the scene
#         data = self.inc.incidence_data
#         diff = data.ptp()
#         self.assertTrue(diff > 0.5, "Got ptp() diff of %s" % diff)
#
#         # ascending pass, values should increase from W->E across scene
#         for i in range(2):
#             d = data[i]
#             self.assertFalse((d == 0).any()) # ensure no NODATA
#             self.assertFalse((isnan(d)).any())
#
#             diff = array([d[i+1] - d[i] for i in range(len(d)-1)])
#             res = abs(diff[diff < 0])
# TODO: check if this is normal
#             self.assertTrue((res < 1e-4).all())
#
#
#     def test_azimuth_data(self):
#         # ensure azimuth is fairly constant
#
#         az = self.inc.azimuth_data
#         self.assertFalse((az == 0).all())
#         az = az[az != 0] # filter NODATA cells
#
#         # azimuth should be relatively constant
#         ptp = az.ptp()
#         self.assertTrue(ptp < 0.1, msg="min -> max diff is %s" % ptp)


class TestDEMTests:
    'Unit tests to verify operations on GeoTIFF format DEMs'

    def setup_method(self):
        self.ras = DEM(SML_TEST_DEM_TIF)

    def test_create_raster(self):
        # validate header path
        assert os.path.exists(self.ras.data_path)

    def test_headers_as_attr(self):
        self.ras.open()
        attrs = ['ncols', 'nrows', 'x_first', 'x_step', 'y_first', 'y_step' ]

        # TODO: are 'projection' and 'datum' attrs needed?
        for a in attrs:
            assert getattr(self.ras, a) is not None

    def test_is_dem(self):
        self.ras = DEM(join(SML_TEST_TIF, 'geo_060619-061002_unw.tif'))
        assert  ~hasattr(self.ras, 'datum')

    def test_open(self):
        assert self.ras.dataset is None
        self.ras.open()
        assert self.ras.dataset is not None
        assert isinstance(self.ras.dataset, Dataset)

        # ensure open cannot be called twice
        with pytest.raises(RasterException):
            self.ras.open()

    # def test_band_fails_with_unopened_raster(self):  # now opening if not open
    #     # test accessing bands with open and unopened datasets
    #     with pytest.raises(RasterException):
    #         self.ras.band

    def test_band_read_with_open_raster(self):
        data = self.ras.band.ReadAsArray()
        assert data.shape == (72, 47)


class TestWriteUnw:

    @classmethod
    @pytest.fixture(autouse=True)
    def setup_class(cls, gamma_params):
        # change the required params
        shared.mkdir_p(gamma_params[C.OUT_DIR])
        from copy import deepcopy
        cls.params = deepcopy(gamma_params)
        cls.params[WORKING_DIR] = common.GAMMA_SML_TEST_DIR
        cls.params[C.PROCESSOR] = 1  # gamma
        cls.params[C.PARALLEL] = 0
        cls.params[C.REF_EST_METHOD] = 1
        cls.params[C.DEM_FILE] = common.SML_TEST_DEM_GAMMA
        cls.params[C.BASE_FILE_LIST] = common.GAMMA_SML_TEST_DIR
        # base_unw_paths need to be geotiffed and multilooked by run_prepifg
        cls.base_unw_paths = tests.common.original_ifg_paths(cls.params[C.IFG_FILE_LIST], cls.params[WORKING_DIR])
        cls.base_unw_paths.append(common.SML_TEST_DEM_GAMMA)

        # dest_paths are tifs that have been geotif converted and multilooked
        conv2tif.main(cls.params)
        prepifg.main(cls.params)

        cls.dest_paths = [Path(cls.params[C.INTERFEROGRAM_DIR]).joinpath(Path(c.sampled_path).name).as_posix()
                          for c in gamma_params[C.INTERFEROGRAM_FILES]]
        cls.dest_paths += [Path(cls.params[C.COHERENCE_DIR]).joinpath(Path(c.sampled_path).name).as_posix()
                          for c in gamma_params[C.COHERENCE_FILE_PATHS]]

        cls.ifgs = [dem_or_ifg(i) for i in cls.dest_paths]
        for i in cls.ifgs:
            i.open()
            i.nodata_value = 0

    @classmethod
    def teardown_class(cls):
        """auto cleaning on"""

    def test_unw_contains_same_data_as_numpy_array(self):
        from datetime import time
        temp_unw = tempfile.mktemp(suffix='.unw')
        temp_tif = tempfile.mktemp(suffix='.tif')

        # setup some header files for use in write_geotif
        dem_header_file = common.SML_TEST_DEM_HDR_GAMMA
        dem_header = gamma.parse_dem_header(dem_header_file)

        header = gamma.parse_epoch_header(
            os.path.join(common.GAMMA_SML_TEST_DIR, '20060828_slc.par'))
        header.update(dem_header)
             
        base_header = gamma.parse_baseline_header(
            os.path.join(common.GAMMA_SML_TEST_DIR, '20060828-20061211_base.par'))
        header.update(base_header)
                
        # insert some dummy data so we are the dem in write_fullres_geotiff is not
        # not activated and ifg write_fullres_geotiff operation works
        header[ifc.PYRATE_TIME_SPAN] = 0
        header[ifc.SECOND_DATE] = 0
        header[ifc.DATA_UNITS] = 'degrees'
        header[ifc.DATA_TYPE] = ifc.ORIG
        header[ifc.SECOND_TIME] = time(10)
        
        # now create arbitrary data
        data = np.random.rand(dem_header[ifc.PYRATE_NROWS], dem_header[ifc.PYRATE_NCOLS])

        # convert numpy array to .unw
        shared.write_unw_from_data_or_geotiff(geotif_or_data=data, dest_unw=temp_unw, ifg_proc=1)
        # convert the .unw to geotif
        shared.write_fullres_geotiff(header=header, data_path=temp_unw, dest=temp_tif, nodata=np.nan)

        # now compare geotiff with original numpy array
        ds = gdal.Open(temp_tif, gdal.GA_ReadOnly)
        data_lv_theta = ds.ReadAsArray()
        ds = None
        np.testing.assert_array_almost_equal(data, data_lv_theta)
        try:
            os.remove(temp_tif)
        except PermissionError:
            print("File opened by another process.")

        try:
            os.remove(temp_unw)
        except PermissionError:
            print("File opened by another process.")

    def test_multilooked_tiffs_converted_to_unw_are_same(self):
        # Get multilooked geotiffs
        geotiffs = list(set(self.dest_paths))
        geotiffs = [g for g in geotiffs if 'dem' not in g]

        # Convert back to .unw
        dest_unws = []
        for g in set(geotiffs):
            dest_unw = os.path.join(self.params[C.OUT_DIR], Path(g).stem + '.unw')
            shared.write_unw_from_data_or_geotiff(geotif_or_data=g, dest_unw=dest_unw, ifg_proc=1)
            dest_unws.append(dest_unw)

        dest_unws_ = []

        for d in dest_unws:
            dest_unws_.append(MultiplePaths(d, self.params))

        # Convert back to tiff
        new_geotiffs_ = conv2tif.do_geotiff(dest_unws_, self.params)
        new_geotiffs = [gt for gt, b in new_geotiffs_]

        # Ensure original multilooked geotiffs and 
        #  unw back to geotiff are the same
        geotiffs.sort(key=lambda x: Path(x).name)
        new_geotiffs.sort(key=lambda x: Path(x).name)

        for g, u in zip(geotiffs, new_geotiffs):
            g_ds = gdal.Open(g)
            u_gs = gdal.Open(u)
            np.testing.assert_array_almost_equal(u_gs.ReadAsArray(), g_ds.ReadAsArray())
            u_gs = None
            g_ds = None

    def test_roipac_raises(self):
        geotiffs = [os.path.join(
            self.params[C.OUT_DIR], os.path.basename(b).split('.')[0] + '_'
                                                   + os.path.basename(b).split('.')[1] + '.tif')
            for b in self.base_unw_paths]

        for g in geotiffs[:1]:
            dest_unw = os.path.join(self.params[C.OUT_DIR], os.path.splitext(g)[0] + '.unw')
            with pytest.raises(NotImplementedError):
                shared.write_unw_from_data_or_geotiff(geotif_or_data=g, dest_unw=dest_unw, ifg_proc=0)


class TestGeodesy:

    def test_utm_zone(self):
        # test some different zones (collected manually)
        for lon in [174.0, 176.5, 179.999, 180.0]:
            assert 60 == _utm_zone(lon)

        for lon in [144.0, 144.1, 146.3456, 149.9999]:
            assert 55 == _utm_zone(lon)

        for lon in [-180.0, -179.275, -176.925]:
            assert 1 == _utm_zone(lon)

        for lon in [-72.0, -66.1]:
            assert 19 == _utm_zone(lon)

        for lon in [0.0, 0.275, 3.925, 5.999]:
            assert 31 == _utm_zone(lon)

    def test_cell_size_polar_region(self):
        # Can't have polar area zones: see http://www.dmap.co.uk/utmworld.htm
        for lat in [-80.1, -85.0, -90.0, 84.1, 85.0, 89.9999, 90.0]:
            with pytest.raises(ValueError):
                cell_size(lat, 0, 0.1, 0.1)

    def test_cell_size_calc(self):
        # test conversion of X|Y_STEP to X|Y_SIZE
        x_deg = 0.000833333
        y_deg = -x_deg

        approx = 90.0 # x_deg is approx 90m
        exp_low = approx - (.15 * approx) # assumed tolerance
        exp_high = approx + (.15 * approx)

        latlons = [(10.0, 15.0), (-10.0, 15.0), (10.0, -15.0), (-10.0, -15.0),
            (178.0, 33.0), (-178.0, 33.0), (178.0, -33.0), (-178.0, -33.0) ]

        for lon, lat in latlons:
            xs, ys = cell_size(lat, lon, x_deg, y_deg)
            for s in (xs, ys):
                assert s > 0, "size=%s" % s
                assert s > exp_low, "size=%s" % s
                assert s < exp_high, "size=%s" % s


@pytest.fixture(params=[0, 1])
def parallel(request):
    return request.param


def get_random_string(length):
    letters = string.ascii_lowercase
    result_str = ''.join(random.choice(letters) for _ in range(length))
    return result_str


def _data_types():
    return {
        'str': [get_random_string(np.random.randint(2, 10)) for _ in range(20)],
        'int': np.arange(20),
        'ndarray': [np.random.randint(low=0, high=10, size=(2, 3), dtype=np.uint8) for _ in range(20)]
        }


data_types = mpiops.run_once(_data_types)  # required otherwise differnt arrays are generated in each mpi process


@pytest.fixture(params=list(data_types.keys()))
def data_type(request):
    return request.param


def test_tiles_split(parallel, data_type):
    params = {
        C.TILES: data_types[data_type],
        C.PARALLEL: parallel,
        C.LOG_LEVEL: 'INFO',
        'multiplier': 2,
        C.PROCESSES: 4
    }

    def func(tile, params):
        return tile * params['multiplier']

    ret = tiles_split(func, params)

    expected_ret = np.array([item*params['multiplier'] for item in data_types[data_type]], dtype=object)
    np.testing.assert_array_equal(ret, expected_ret)


def test_convert_to_radians():
    import math
    data = np.random.randint(1, 10, (4, 5))
    wavelength = 10.5
    ret = shared.convert_mm_to_radians(data, wavelength)
    expected = data * (4 * math.pi) / wavelength /ifc.MM_PER_METRE
    np.testing.assert_array_almost_equal(ret, expected)


def test_convert_to_radians_ifg(ten_geotiffs):
    for g in ten_geotiffs[:2]:
        ifg = Ifg(g)
        ifg.open()
        md = ifg.dataset.GetMetadata()
        assert ifc.DATA_TYPE in md
        assert md[ifc.DATA_TYPE] == ifc.ORIG
        assert md[ifc.DATA_UNITS] == shared.RADIANS
        rad_data = ifg.phase_data
        ifg.convert_to_mm()
        assert ifg.meta_data[ifc.DATA_UNITS] == shared.MILLIMETRES
        ifg.convert_to_radians()
        assert md[ifc.DATA_UNITS] == shared.RADIANS
        np.testing.assert_array_almost_equal(rad_data, ifg.phase_data, decimal=4)

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API