test_gamma.py
# This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This Python module contains tests for the gamma.py PyRate module.
"""
import os
import shutil
import tempfile
from datetime import date, time
from os.path import join
import pytest
from pathlib import Path
import numpy as np
from numpy.testing import assert_array_almost_equal
from osgeo import gdal
import pyrate.configuration
import pyrate.core.ifgconstants as ifc
from pyrate.core import shared, config as cf, gamma
from pyrate.core.config import (
DEM_HEADER_FILE,
NO_DATA_VALUE,
OBS_DIR,
IFG_FILE_LIST,
PROCESSOR,
OUT_DIR,
SLC_DIR)
from pyrate import prepifg, conv2tif
from pyrate.core.shared import write_fullres_geotiff, GeotiffException
from pyrate.constants import PYRATEPATH
from tests.common import manipulate_test_conf
from pyrate.configuration import Configuration
from tests.common import GAMMA_TEST_DIR
from tests.common import TEMPDIR
from tests.common import small_data_setup
gdal.UseExceptions()
LIGHTSPEED = 3e8 # approx
class TestGammaCommandLineTests:
def setup_method(self):
self.base = join(PYRATEPATH, 'tests', 'test_data', 'gamma')
self.hdr = join(self.base, 'dem16x20raw.dem.par')
temp_text = tempfile.mktemp()
self.confFile = os.path.join(TEMPDIR,'{}/gamma_test.cfg'.format(temp_text))
self.ifgListFile = os.path.join(TEMPDIR, '{}/gamma_ifg.list'.format(temp_text))
self.base_dir = os.path.dirname(self.confFile)
shared.mkdir_p(self.base_dir)
def teardown_method(self):
try:
os.remove(self.exp_path)
except:
pass
shutil.rmtree(self.base_dir, ignore_errors=True)
def makeInputFiles(self, data):
with open(self.confFile, 'w') as conf:
conf.write('{}: {}\n'.format(DEM_HEADER_FILE, self.hdr))
conf.write('{}: {}\n'.format(NO_DATA_VALUE, '0.0'))
conf.write('{}: {}\n'.format(OBS_DIR, self.base_dir))
conf.write('{}: {}\n'.format(IFG_FILE_LIST, self.ifgListFile))
conf.write('{}: {}\n'.format(PROCESSOR, '1'))
conf.write('{}: {}\n'.format(OUT_DIR, self.base_dir))
conf.write('{}: {}\n'.format(SLC_DIR, ''))
with open(self.ifgListFile, 'w') as ifgl:
ifgl.write(data)
class TestGammaToGeoTiff:
"""Tests conversion of GAMMA rasters to custom PyRate GeoTIFF"""
@classmethod
def setup_method(cls):
# create common combined header obj so the headers are only read once
# tricker: needs both ifg headers, and DEM one for the extents
filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
hdr_paths = [join(GAMMA_TEST_DIR, f) for f in filenames]
hdrs = [gamma.parse_epoch_header(p) for p in hdr_paths]
dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
cls.DEM_HDR = gamma.parse_dem_header(dem_hdr_path)
cls.COMBINED = gamma.combine_headers(*hdrs, dem_hdr=cls.DEM_HDR)
def teardown_method(self):
if os.path.exists(self.dest):
os.remove(self.dest)
def test_to_geotiff_dem(self):
hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
hdr = gamma.parse_dem_header(hdr_path)
data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
self.dest = os.path.join(TEMPDIR, "tmp_gamma_dem.tif")
write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)
exp_path = join(GAMMA_TEST_DIR, 'dem16x20_subset_from_gamma.tif')
exp_ds = gdal.Open(exp_path)
ds = gdal.Open(self.dest)
# compare data and geographic headers
# HACK: round expected to nearest integer
assert_array_almost_equal(np.rint(exp_ds.ReadAsArray()),
ds.ReadAsArray())
self.compare_rasters(exp_ds, ds)
md = ds.GetMetadata()
assert md['AREA_OR_POINT'] == 'Area'
def test_to_geotiff_ifg(self):
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
data_path = join(GAMMA_TEST_DIR,
'16x20_20090713-20090817_VV_4rlks_utm.unw')
write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)
ds = gdal.Open(self.dest)
exp_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
exp_ds = gdal.Open(exp_path)
# compare data and geographic headers
assert_array_almost_equal(exp_ds.ReadAsArray(), ds.ReadAsArray())
self.compare_rasters(ds, exp_ds)
md = ds.GetMetadata()
assert len(md) == 11 # 11 metadata items
assert md[ifc.FIRST_DATE] == str(date(2009, 7, 13))
assert md[ifc.SECOND_DATE] == str(date(2009, 8, 17))
assert md[ifc.PYRATE_TIME_SPAN] == str(35 / ifc.DAYS_PER_YEAR)
# TODO test failing
# self.assertTrue(md[ifc.FIRST_DATE] == str(12))
# TODO test failing
# self.assertTrue(md[ifc.SECOND_DATE] == str(time(12)))
wavelen = float(md[ifc.PYRATE_WAVELENGTH_METRES])
assert wavelen == pytest.approx(0.05627457792190739)
def test_to_geotiff_wrong_input_data(self):
# use TIF, not UNW for data
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
data_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
with pytest.raises(GeotiffException):
write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)
def compare_rasters(self, ds, exp_ds):
band = ds.GetRasterBand(1)
exp_band = exp_ds.GetRasterBand(1)
nodata = band.GetNoDataValue()
assert ~(nodata is None)
assert exp_band.GetNoDataValue() == nodata
pj = ds.GetProjection()
assert 'WGS 84' in pj
assert exp_ds.GetProjection() == pj
for exp, act in zip(exp_ds.GetGeoTransform(), ds.GetGeoTransform()):
assert exp == pytest.approx(act, abs=0.0001)
def test_bad_projection(self):
hdr = self.DEM_HDR.copy()
hdr[ifc.PYRATE_DATUM] = 'nonexistent projection'
data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_dem2.tif')
with pytest.raises(GeotiffException):
write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)
class TestGammaHeaderParsingTests:
'Tests conversion of GAMMA headers to Py dicts'
def test_parse_gamma_epoch_header(self):
# minimal required headers are:
# date: 2009 7 13
# radar_frequency: 5.3310040e+09 Hz
path = join(GAMMA_TEST_DIR, 'r20090713_VV.slc.par')
hdrs = gamma.parse_epoch_header(path)
exp_date = date(2009, 7, 13)
assert hdrs[ifc.FIRST_DATE] == exp_date
exp_wavelen = LIGHTSPEED / 5.3310040e+09
assert hdrs[ifc.PYRATE_WAVELENGTH_METRES] == exp_wavelen
incidence_angle = 22.9671
assert hdrs[ifc.PYRATE_INCIDENCE_DEGREES] == incidence_angle
def test_parse_gamma_dem_header(self):
path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
hdrs = gamma.parse_dem_header(path)
self.assert_equal(hdrs[ifc.PYRATE_NCOLS], 16)
self.assert_equal(hdrs[ifc.PYRATE_NROWS], 20)
self.assert_equal(hdrs[ifc.PYRATE_LAT], -33.3831945)
self.assert_equal(hdrs[ifc.PYRATE_LONG], 150.3870833)
self.assert_equal(hdrs[ifc.PYRATE_X_STEP], 6.9444445e-05)
self.assert_equal(hdrs[ifc.PYRATE_Y_STEP], -6.9444445e-05)
@staticmethod
def assert_equal(arg1, arg2):
assert arg1 == arg2
# Test data for the epoch header combination
H0 = {ifc.FIRST_DATE : date(2009, 7, 13),
ifc.FIRST_DATE : time(12),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.565,
}
H1 = {ifc.FIRST_DATE : date(2009, 8, 17),
ifc.FIRST_DATE : time(12, 10, 10),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
}
H1_ERR1 = {ifc.FIRST_DATE : date(2009, 8, 17),
ifc.FIRST_DATE : time(12),
ifc.PYRATE_WAVELENGTH_METRES: 2.4,
ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
}
H1_ERR2 = {ifc.FIRST_DATE : date(2009, 8, 17),
ifc.FIRST_DATE : time(12),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.76,
}
class TestHeaderCombination:
'Tests GAMMA epoch and DEM headers can be combined into a single Py dict'
def setup_method(self):
self.err = gamma.GammaException
dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
self.dh = gamma.parse_dem_header(dem_hdr_path)
@staticmethod
def assert_equal(arg1, arg2):
assert arg1 == arg2
def test_combine_headers(self):
filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
paths = [join(GAMMA_TEST_DIR, p) for p in filenames]
hdr0, hdr1 = [gamma.parse_epoch_header(p) for p in paths]
chdr = gamma.combine_headers(hdr0, hdr1, self.dh)
exp_timespan = (18 + 17) / ifc.DAYS_PER_YEAR
self.assert_equal(chdr[ifc.PYRATE_TIME_SPAN], exp_timespan)
exp_date = date(2009, 7, 13)
self.assert_equal(chdr[ifc.FIRST_DATE], exp_date)
exp_date2 = date(2009, 8, 17)
self.assert_equal(chdr[ifc.SECOND_DATE], exp_date2)
exp_wavelen = LIGHTSPEED / 5.3310040e+09
self.assert_equal(chdr[ifc.PYRATE_WAVELENGTH_METRES], exp_wavelen)
def test_fail_non_dict_header(self):
self.assertRaises(gamma.combine_headers, H0, '', self.dh)
self.assertRaises(gamma.combine_headers, '', H0, self.dh)
self.assertRaises(gamma.combine_headers, H0, H1, None)
self.assertRaises(gamma.combine_headers, H0, H1, '')
def test_fail_mismatching_wavelength(self):
self.assertRaises(gamma.combine_headers, H0, H1_ERR1, self.dh)
def test_fail_mismatching_incidence(self):
self.assertRaises(gamma.combine_headers, H0, H1_ERR2, self.dh)
def test_fail_same_date(self):
self.assertRaises(gamma.combine_headers, H0, H0, self.dh)
def test_fail_bad_date_order(self):
with pytest.raises(self.err):
gamma.combine_headers(H1, H0, self.dh)
def assertRaises(self, func, * args):
with pytest.raises(self.err):
func(* args)
ifg_glob_suffix = "*_ifg.tif"
coh_glob_suffix = "*_coh.tif"
@pytest.fixture(scope='module')
def parallel_ifgs(gamma_conf):
tdir = Path(tempfile.mkdtemp())
params_p = manipulate_test_conf(gamma_conf, tdir)
params_p[cf.PARALLEL] = 1
output_conf_file = 'conf.conf'
output_conf = tdir.joinpath(output_conf_file)
pyrate.configuration.write_config_file(params=params_p, output_conf_file=output_conf)
params_p = Configuration(output_conf).__dict__
conv2tif.main(params_p)
prepifg.main(params_p)
parallel_df = list(Path(tdir).joinpath('out').glob(ifg_glob_suffix))
parallel_coh_files = list(Path(tdir).joinpath('out').glob(coh_glob_suffix))
p_ifgs = small_data_setup(datafiles=parallel_df + parallel_coh_files)
yield p_ifgs
shutil.rmtree(params_p[cf.OBS_DIR], ignore_errors=True)
@pytest.fixture(scope='module')
def series_ifgs(gamma_conf):
print('======================setup series==========================')
tdir = Path(tempfile.mkdtemp())
params_s = manipulate_test_conf(gamma_conf, tdir)
params_s[cf.PARALLEL] = 0
output_conf_file = 'conf.conf'
output_conf = tdir.joinpath(output_conf_file)
pyrate.configuration.write_config_file(params=params_s, output_conf_file=output_conf)
params_s = Configuration(output_conf).__dict__
conv2tif.main(params_s)
prepifg.main(params_s)
serial_ifgs = list(Path(tdir).joinpath('out').glob(ifg_glob_suffix))
coh_files = list(Path(tdir).joinpath('out').glob(coh_glob_suffix))
s_ifgs = small_data_setup(datafiles=serial_ifgs + coh_files)
yield s_ifgs
print('======================teardown series==========================')
shutil.rmtree(params_s[cf.OBS_DIR], ignore_errors=True)
def test_equality(series_ifgs, parallel_ifgs):
for s, p in zip(series_ifgs, parallel_ifgs):
np.testing.assert_array_almost_equal(s.phase_data, p.phase_data)
def test_meta_data_exists(series_ifgs, parallel_ifgs):
for i, (s, p) in enumerate(zip(series_ifgs, parallel_ifgs)):
# all metadata equal
assert s.meta_data == p.meta_data
# test that DATA_TYPE exists in metadata
assert ifc.DATA_TYPE in s.meta_data.keys()
# test that DATA_TYPE is MULTILOOKED
assert (s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED) or \
(s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED_COH)
assert i + 1 == 34