Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • 9dcd810
  • /
  • test_gamma.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:9537b3602e7a1a7b950c2876b2a0e7e9c0c2a039
directory badge Iframe embedding
swh:1:dir:9dcd810517f5ee788fdc353347bb3e57774a54b2

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
test_gamma.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains tests for the gamma.py PyRate module.
"""
import os
import shutil
import tempfile
from datetime import date, time
from os.path import join
import pytest
from pathlib import Path
import numpy as np
from numpy.testing import assert_array_almost_equal
from osgeo import gdal

import pyrate.configuration
import pyrate.core.ifgconstants as ifc
from pyrate.core import shared, config as cf, gamma
from pyrate.core.config import (
    DEM_HEADER_FILE,
    NO_DATA_VALUE,
    OBS_DIR,
    IFG_FILE_LIST,
    PROCESSOR,
    OUT_DIR,
    SLC_DIR)
from pyrate import prepifg, conv2tif
from pyrate.core.shared import write_fullres_geotiff, GeotiffException
from pyrate.constants import PYRATEPATH

from tests.common import manipulate_test_conf
from pyrate.configuration import Configuration
from tests.common import GAMMA_TEST_DIR
from tests.common import TEMPDIR
from tests.common import small_data_setup

gdal.UseExceptions()

LIGHTSPEED = 3e8  # approx


class TestGammaCommandLineTests:

    def setup_method(self):
        self.base = join(PYRATEPATH, 'tests', 'test_data', 'gamma')
        self.hdr = join(self.base, 'dem16x20raw.dem.par')
        temp_text = tempfile.mktemp()
        self.confFile = os.path.join(TEMPDIR,'{}/gamma_test.cfg'.format(temp_text))
        self.ifgListFile = os.path.join(TEMPDIR, '{}/gamma_ifg.list'.format(temp_text))
        self.base_dir = os.path.dirname(self.confFile)
        shared.mkdir_p(self.base_dir)

    def teardown_method(self):
        try:
            os.remove(self.exp_path)
        except:
            pass
        shutil.rmtree(self.base_dir, ignore_errors=True)

    def makeInputFiles(self, data):
        with open(self.confFile, 'w') as conf:
            conf.write('{}: {}\n'.format(DEM_HEADER_FILE, self.hdr))
            conf.write('{}: {}\n'.format(NO_DATA_VALUE, '0.0'))
            conf.write('{}: {}\n'.format(OBS_DIR, self.base_dir))
            conf.write('{}: {}\n'.format(IFG_FILE_LIST, self.ifgListFile))
            conf.write('{}: {}\n'.format(PROCESSOR, '1'))
            conf.write('{}: {}\n'.format(OUT_DIR, self.base_dir))
            conf.write('{}: {}\n'.format(SLC_DIR, ''))
        with open(self.ifgListFile, 'w') as ifgl:
            ifgl.write(data)


class TestGammaToGeoTiff:
    """Tests conversion of GAMMA rasters to custom PyRate GeoTIFF"""

    @classmethod
    def setup_method(cls):
        # create common combined header obj so the headers are only read once
        # tricker: needs both ifg headers, and DEM one for the extents
        filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
        hdr_paths = [join(GAMMA_TEST_DIR, f) for f in filenames]
        hdrs = [gamma.parse_epoch_header(p) for p in hdr_paths]
        dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')

        cls.DEM_HDR = gamma.parse_dem_header(dem_hdr_path)
        cls.COMBINED = gamma.combine_headers(*hdrs, dem_hdr=cls.DEM_HDR)

    def teardown_method(self):
        if os.path.exists(self.dest):
            os.remove(self.dest)

    def test_to_geotiff_dem(self):
        hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
        hdr = gamma.parse_dem_header(hdr_path)
        data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
        self.dest = os.path.join(TEMPDIR, "tmp_gamma_dem.tif")

        write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)
        exp_path = join(GAMMA_TEST_DIR, 'dem16x20_subset_from_gamma.tif')
        exp_ds = gdal.Open(exp_path)
        ds = gdal.Open(self.dest)

        # compare data and geographic headers
        # HACK: round  expected to nearest integer
        assert_array_almost_equal(np.rint(exp_ds.ReadAsArray()),
                                  ds.ReadAsArray())
        self.compare_rasters(exp_ds, ds)
        md = ds.GetMetadata()
        assert md['AREA_OR_POINT'] == 'Area'

    def test_to_geotiff_ifg(self):
        self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
        data_path = join(GAMMA_TEST_DIR,
                         '16x20_20090713-20090817_VV_4rlks_utm.unw')
        write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)

        ds = gdal.Open(self.dest)
        exp_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
        exp_ds = gdal.Open(exp_path)

        # compare data and geographic headers
        assert_array_almost_equal(exp_ds.ReadAsArray(), ds.ReadAsArray())
        self.compare_rasters(ds, exp_ds)

        md = ds.GetMetadata()
        assert len(md) == 11 # 11 metadata items
        assert md[ifc.FIRST_DATE] == str(date(2009, 7, 13))
        assert md[ifc.SECOND_DATE] == str(date(2009, 8, 17))
        assert md[ifc.PYRATE_TIME_SPAN] == str(35 / ifc.DAYS_PER_YEAR)
        # TODO test failing
        # self.assertTrue(md[ifc.FIRST_DATE] == str(12))
        # TODO test failing
        # self.assertTrue(md[ifc.SECOND_DATE] == str(time(12)))

        wavelen = float(md[ifc.PYRATE_WAVELENGTH_METRES])
        assert wavelen == pytest.approx(0.05627457792190739)

    def test_to_geotiff_wrong_input_data(self):
        # use TIF, not UNW for data
        self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
        data_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
        with pytest.raises(GeotiffException):
            write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)

    def compare_rasters(self, ds, exp_ds):
        band = ds.GetRasterBand(1)
        exp_band = exp_ds.GetRasterBand(1)

        nodata = band.GetNoDataValue()
        assert ~(nodata is None)
        assert exp_band.GetNoDataValue() == nodata

        pj = ds.GetProjection()
        assert 'WGS 84' in pj
        assert exp_ds.GetProjection() == pj
        for exp, act in zip(exp_ds.GetGeoTransform(), ds.GetGeoTransform()):
            assert exp == pytest.approx(act, abs=0.0001)

    def test_bad_projection(self):
        hdr = self.DEM_HDR.copy()
        hdr[ifc.PYRATE_DATUM] = 'nonexistent projection'
        data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
        self.dest = os.path.join(TEMPDIR, 'tmp_gamma_dem2.tif')
        with pytest.raises(GeotiffException):
            write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)


class TestGammaHeaderParsingTests:
    'Tests conversion of GAMMA headers to Py dicts'

    def test_parse_gamma_epoch_header(self):
        # minimal required headers are:
        # date:      2009  7 13
        # radar_frequency:        5.3310040e+09   Hz
        path = join(GAMMA_TEST_DIR, 'r20090713_VV.slc.par')
        hdrs = gamma.parse_epoch_header(path)

        exp_date = date(2009, 7, 13)
        assert hdrs[ifc.FIRST_DATE] == exp_date

        exp_wavelen = LIGHTSPEED / 5.3310040e+09
        assert hdrs[ifc.PYRATE_WAVELENGTH_METRES] == exp_wavelen

        incidence_angle = 22.9671
        assert hdrs[ifc.PYRATE_INCIDENCE_DEGREES] == incidence_angle

    def test_parse_gamma_dem_header(self):
        path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
        hdrs = gamma.parse_dem_header(path)

        self.assert_equal(hdrs[ifc.PYRATE_NCOLS], 16)
        self.assert_equal(hdrs[ifc.PYRATE_NROWS], 20)
        self.assert_equal(hdrs[ifc.PYRATE_LAT], -33.3831945)
        self.assert_equal(hdrs[ifc.PYRATE_LONG], 150.3870833)
        self.assert_equal(hdrs[ifc.PYRATE_X_STEP], 6.9444445e-05)
        self.assert_equal(hdrs[ifc.PYRATE_Y_STEP], -6.9444445e-05)

    @staticmethod
    def assert_equal(arg1, arg2):
        assert arg1 == arg2


# Test data for the epoch header combination
H0 = {ifc.FIRST_DATE : date(2009, 7, 13),
      ifc.FIRST_DATE : time(12),
      ifc.PYRATE_WAVELENGTH_METRES: 1.8,
      ifc.PYRATE_INCIDENCE_DEGREES: 35.565,
      }

H1 = {ifc.FIRST_DATE : date(2009, 8, 17),
      ifc.FIRST_DATE : time(12, 10, 10),
      ifc.PYRATE_WAVELENGTH_METRES: 1.8,
      ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
      }

H1_ERR1 = {ifc.FIRST_DATE : date(2009, 8, 17),
          ifc.FIRST_DATE : time(12),
          ifc.PYRATE_WAVELENGTH_METRES: 2.4,
          ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
          }

H1_ERR2 = {ifc.FIRST_DATE : date(2009, 8, 17),
          ifc.FIRST_DATE : time(12),
          ifc.PYRATE_WAVELENGTH_METRES: 1.8,
          ifc.PYRATE_INCIDENCE_DEGREES: 35.76,
          }


class TestHeaderCombination:
    'Tests GAMMA epoch and DEM headers can be combined into a single Py dict'

    def setup_method(self):
        self.err = gamma.GammaException
        dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
        self.dh = gamma.parse_dem_header(dem_hdr_path)

    @staticmethod
    def assert_equal(arg1, arg2):
        assert arg1 == arg2

    def test_combine_headers(self):
        filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
        paths = [join(GAMMA_TEST_DIR, p) for p in filenames]
        hdr0, hdr1 = [gamma.parse_epoch_header(p) for p in paths]

        chdr = gamma.combine_headers(hdr0, hdr1, self.dh)

        exp_timespan = (18 + 17) / ifc.DAYS_PER_YEAR
        self.assert_equal(chdr[ifc.PYRATE_TIME_SPAN], exp_timespan)

        exp_date = date(2009, 7, 13)
        self.assert_equal(chdr[ifc.FIRST_DATE], exp_date)
        exp_date2 = date(2009, 8, 17)
        self.assert_equal(chdr[ifc.SECOND_DATE], exp_date2)

        exp_wavelen = LIGHTSPEED / 5.3310040e+09
        self.assert_equal(chdr[ifc.PYRATE_WAVELENGTH_METRES], exp_wavelen)

    def test_fail_non_dict_header(self):
        self.assertRaises(gamma.combine_headers, H0, '', self.dh)
        self.assertRaises(gamma.combine_headers, '', H0, self.dh)
        self.assertRaises(gamma.combine_headers, H0, H1, None)
        self.assertRaises(gamma.combine_headers, H0, H1, '')

    def test_fail_mismatching_wavelength(self):
        self.assertRaises(gamma.combine_headers, H0, H1_ERR1, self.dh)
        
    def test_fail_mismatching_incidence(self):
        self.assertRaises(gamma.combine_headers, H0, H1_ERR2, self.dh)

    def test_fail_same_date(self):
        self.assertRaises(gamma.combine_headers, H0, H0, self.dh)

    def test_fail_bad_date_order(self):
        with pytest.raises(self.err):
            gamma.combine_headers(H1, H0, self.dh)

    def assertRaises(self, func, * args):
        with pytest.raises(self.err):
            func(* args)


ifg_glob_suffix = "*_ifg.tif"
coh_glob_suffix = "*_coh.tif"


@pytest.fixture(scope='module')
def parallel_ifgs(gamma_conf):

    tdir = Path(tempfile.mkdtemp())

    params_p = manipulate_test_conf(gamma_conf, tdir)
    params_p[cf.PARALLEL] = 1

    output_conf_file = 'conf.conf'
    output_conf = tdir.joinpath(output_conf_file)
    pyrate.configuration.write_config_file(params=params_p, output_conf_file=output_conf)

    params_p = Configuration(output_conf).__dict__

    conv2tif.main(params_p)
    prepifg.main(params_p)

    parallel_df = list(Path(tdir).joinpath('out').glob(ifg_glob_suffix))
    parallel_coh_files = list(Path(tdir).joinpath('out').glob(coh_glob_suffix))

    p_ifgs = small_data_setup(datafiles=parallel_df + parallel_coh_files)
    yield p_ifgs

    shutil.rmtree(params_p[cf.OBS_DIR], ignore_errors=True)


@pytest.fixture(scope='module')
def series_ifgs(gamma_conf):

    print('======================setup series==========================')
    tdir = Path(tempfile.mkdtemp())

    params_s = manipulate_test_conf(gamma_conf, tdir)
    params_s[cf.PARALLEL] = 0

    output_conf_file = 'conf.conf'
    output_conf = tdir.joinpath(output_conf_file)
    pyrate.configuration.write_config_file(params=params_s, output_conf_file=output_conf)

    params_s = Configuration(output_conf).__dict__

    conv2tif.main(params_s)

    prepifg.main(params_s)

    serial_ifgs = list(Path(tdir).joinpath('out').glob(ifg_glob_suffix))
    coh_files = list(Path(tdir).joinpath('out').glob(coh_glob_suffix))

    s_ifgs = small_data_setup(datafiles=serial_ifgs + coh_files)
    yield s_ifgs
    print('======================teardown series==========================')

    shutil.rmtree(params_s[cf.OBS_DIR], ignore_errors=True)


def test_equality(series_ifgs, parallel_ifgs):
    for s, p in zip(series_ifgs, parallel_ifgs):
        np.testing.assert_array_almost_equal(s.phase_data, p.phase_data)


def test_meta_data_exists(series_ifgs, parallel_ifgs):
    for i, (s, p) in enumerate(zip(series_ifgs, parallel_ifgs)):
        # all metadata equal
        assert s.meta_data == p.meta_data
        # test that DATA_TYPE exists in metadata
        assert ifc.DATA_TYPE in s.meta_data.keys()
        # test that DATA_TYPE is MULTILOOKED
        assert (s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED) or \
               (s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED_COH)
    assert i + 1 == 34

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API