1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425 | # This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This Python module contains tests for the gamma.py PyRate module.
"""
import os
import shutil
import tempfile
from datetime import date, time
from os.path import join
import pytest
from pathlib import Path
import numpy as np
from numpy.testing import assert_array_almost_equal
from osgeo import gdal
import pyrate.configuration
import pyrate.core.ifgconstants as ifc
from pyrate.core import shared, gamma
from pyrate import prepifg, conv2tif
from pyrate.core.shared import write_fullres_geotiff, GeotiffException
import pyrate.constants as C
from pyrate.constants import PYRATEPATH, IFG_FILE_LIST, PROCESSOR, OUT_DIR, DEM_HEADER_FILE, \
NO_DATA_VALUE, BASE_FILE_LIST
from tests.common import manipulate_test_conf
from pyrate.configuration import Configuration
from tests.common import GAMMA_TEST_DIR, WORKING_DIR
from tests.common import TEMPDIR
from tests.common import small_data_setup
gdal.UseExceptions()
LIGHTSPEED = 3e8 # approx
class TestGammaCommandLineTests:
def setup_method(self):
self.base = join(PYRATEPATH, 'tests', 'test_data', 'gamma')
self.hdr = join(self.base, 'dem16x20raw.dem.par')
temp_text = tempfile.mktemp()
self.confFile = os.path.join(TEMPDIR,'{}/gamma_test.cfg'.format(temp_text))
self.ifgListFile = os.path.join(TEMPDIR, '{}/gamma_ifg.list'.format(temp_text))
self.baseListFile = os.path.join(TEMPDIR, '{}/gamma_base.list'.format(temp_text))
self.base_dir = os.path.dirname(self.confFile)
shared.mkdir_p(self.base_dir)
def teardown_method(self):
try:
os.remove(self.exp_path)
except:
pass
shutil.rmtree(self.base_dir, ignore_errors=True)
def makeInputFiles(self, data):
with open(self.confFile, 'w') as conf:
conf.write('{}: {}\n'.format(DEM_HEADER_FILE, self.hdr))
conf.write('{}: {}\n'.format(NO_DATA_VALUE, '0.0'))
conf.write('{}: {}\n'.format(WORKING_DIR, self.base_dir))
conf.write('{}: {}\n'.format(IFG_FILE_LIST, self.ifgListFile))
conf.write('{}: {}\n'.format(BASE_FILE_LIST, self.baseListFile))
conf.write('{}: {}\n'.format(PROCESSOR, '1'))
conf.write('{}: {}\n'.format(OUT_DIR, self.base_dir))
with open(self.ifgListFile, 'w') as ifgl:
ifgl.write(data)
class TestGammaToGeoTiff:
"""Tests conversion of GAMMA rasters to custom PyRate GeoTIFF"""
@classmethod
def setup_method(cls):
# create common combined header obj so the headers are only read once
# tricker: needs both ifg headers, and DEM one for the extents
filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
hdr_paths = [join(GAMMA_TEST_DIR, f) for f in filenames]
hdrs = [gamma.parse_epoch_header(p) for p in hdr_paths]
dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
base_hdr_path = join(GAMMA_TEST_DIR, '20160114-20160126_base.par')
cls.DEM_HDR = gamma.parse_dem_header(dem_hdr_path)
cls.BASE_HDR = gamma.parse_baseline_header(base_hdr_path)
cls.COMBINED = gamma.combine_headers(*hdrs, dem_hdr=cls.DEM_HDR, base_hdr=cls.BASE_HDR)
def teardown_method(self):
if os.path.exists(self.dest):
os.remove(self.dest)
def test_to_geotiff_dem(self):
hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
hdr = gamma.parse_dem_header(hdr_path)
data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
self.dest = os.path.join(TEMPDIR, "tmp_gamma_dem.tif")
write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)
exp_path = join(GAMMA_TEST_DIR, 'dem16x20_subset_from_gamma.tif')
exp_ds = gdal.Open(exp_path)
ds = gdal.Open(self.dest)
# compare data and geographic headers
# HACK: round expected to nearest integer
assert_array_almost_equal(np.rint(exp_ds.ReadAsArray()),
ds.ReadAsArray())
self.compare_rasters(exp_ds, ds)
md = ds.GetMetadata()
assert md['AREA_OR_POINT'] == 'Area'
def test_to_geotiff_ifg(self):
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
data_path = join(GAMMA_TEST_DIR,
'16x20_20090713-20090817_VV_4rlks_utm.unw')
write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)
ds = gdal.Open(self.dest)
exp_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
exp_ds = gdal.Open(exp_path)
# compare data and geographic headers
assert_array_almost_equal(exp_ds.ReadAsArray(), ds.ReadAsArray())
self.compare_rasters(ds, exp_ds)
md = ds.GetMetadata()
assert len(md) == 32 # 32 metadata items
assert md[ifc.FIRST_DATE] == str(date(2009, 7, 13))
assert md[ifc.SECOND_DATE] == str(date(2009, 8, 17))
assert md[ifc.PYRATE_TIME_SPAN] == str(35 / ifc.DAYS_PER_YEAR)
# TODO test failing
# self.assertTrue(md[ifc.FIRST_DATE] == str(12))
# TODO test failing
# self.assertTrue(md[ifc.SECOND_DATE] == str(time(12)))
wavelen = float(md[ifc.PYRATE_WAVELENGTH_METRES])
assert wavelen == pytest.approx(0.05627457792190739)
def test_to_geotiff_wrong_input_data(self):
# use TIF, not UNW for data
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_ifg.tif')
data_path = join(GAMMA_TEST_DIR, '16x20_20090713-20090817_VV_4rlks_utm.tif')
with pytest.raises(GeotiffException):
write_fullres_geotiff(self.COMBINED, data_path, self.dest, nodata=0)
def compare_rasters(self, ds, exp_ds):
band = ds.GetRasterBand(1)
exp_band = exp_ds.GetRasterBand(1)
nodata = band.GetNoDataValue()
assert ~(nodata is None)
assert exp_band.GetNoDataValue() == nodata
pj = ds.GetProjection()
assert 'WGS 84' in pj
assert exp_ds.GetProjection() == pj
for exp, act in zip(exp_ds.GetGeoTransform(), ds.GetGeoTransform()):
assert exp == pytest.approx(act, abs=0.0001)
def test_bad_projection(self):
hdr = self.DEM_HDR.copy()
hdr[ifc.PYRATE_DATUM] = 'nonexistent projection'
data_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem')
self.dest = os.path.join(TEMPDIR, 'tmp_gamma_dem2.tif')
with pytest.raises(GeotiffException):
write_fullres_geotiff(hdr, data_path, self.dest, nodata=0)
class TestGammaHeaderListRaiseException:
'Test to make sure PyRate raises exception when IFG header list contains more than two files'
def setup_method(self):
self.demHeaderString = f'{GAMMA_TEST_DIR}/dem16x20raw.dem.par'
self.headerList = [f'{GAMMA_TEST_DIR}/r20090713_VV.slc.par', f'{GAMMA_TEST_DIR}/r20090817_VV.slc.par', f'{GAMMA_TEST_DIR}/r20090713_VV.slc.par', f'{GAMMA_TEST_DIR}/r20090817_VV.slc.par']
def test_exception(self):
with pytest.raises(gamma.GammaException):
gamma.manage_headers(self.demHeaderString, self.headerList)
class TestGammaHeaderParsingTests:
'Tests conversion of GAMMA headers to Py dicts'
def test_parse_gamma_epoch_header(self):
# minimal required headers are:
# date: 2009 7 13
# radar_frequency: 5.3310040e+09 Hz
path = join(GAMMA_TEST_DIR, 'r20090713_VV.slc.par')
hdrs = gamma.parse_epoch_header(path)
exp_date = date(2009, 7, 13)
assert hdrs[ifc.FIRST_DATE] == exp_date
exp_wavelen = LIGHTSPEED / 5.3310040e+09
assert hdrs[ifc.PYRATE_WAVELENGTH_METRES] == exp_wavelen
incidence_angle = 22.9671
assert hdrs[ifc.PYRATE_INCIDENCE_DEGREES] == incidence_angle
def test_parse_gamma_dem_header(self):
path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
hdrs = gamma.parse_dem_header(path)
self.assert_equal(hdrs[ifc.PYRATE_NCOLS], 16)
self.assert_equal(hdrs[ifc.PYRATE_NROWS], 20)
self.assert_equal(hdrs[ifc.PYRATE_LAT], -33.3831945)
self.assert_equal(hdrs[ifc.PYRATE_LONG], 150.3870833)
self.assert_equal(hdrs[ifc.PYRATE_X_STEP], 6.9444445e-05)
self.assert_equal(hdrs[ifc.PYRATE_Y_STEP], -6.9444445e-05)
@staticmethod
def assert_equal(arg1, arg2):
assert arg1 == arg2
# Test data for the epoch header combination
H0 = {ifc.FIRST_DATE: date(2009, 7, 13),
ifc.FIRST_TIME: time(12),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.565,
}
H1 = {ifc.FIRST_DATE: date(2009, 8, 17),
ifc.FIRST_TIME: time(12, 10, 10),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
}
H1_ERR1 = {ifc.FIRST_DATE: date(2009, 8, 17),
ifc.FIRST_TIME: time(12),
ifc.PYRATE_WAVELENGTH_METRES: 2.4,
ifc.PYRATE_INCIDENCE_DEGREES: 35.56,
}
H1_ERR2 = {ifc.FIRST_DATE: date(2009, 8, 17),
ifc.FIRST_TIME: time(12),
ifc.PYRATE_WAVELENGTH_METRES: 1.8,
ifc.PYRATE_INCIDENCE_DEGREES: 35.76,
}
class TestHeaderCombination:
'Tests GAMMA epoch and DEM headers can be combined into a single Py dict'
def setup_method(self):
self.err = gamma.GammaException
dem_hdr_path = join(GAMMA_TEST_DIR, 'dem16x20raw.dem.par')
self.dh = gamma.parse_dem_header(dem_hdr_path)
base_hdr_path = join(GAMMA_TEST_DIR, '20160114-20160126_base.par')
self.bh = gamma.parse_baseline_header(base_hdr_path)
@staticmethod
def assert_equal(arg1, arg2):
assert arg1 == arg2
def test_combine_headers(self):
filenames = ['r20090713_VV.slc.par', 'r20090817_VV.slc.par']
paths = [join(GAMMA_TEST_DIR, p) for p in filenames]
hdr0, hdr1 = [gamma.parse_epoch_header(p) for p in paths]
chdr = gamma.combine_headers(hdr0, hdr1, self.dh, self.bh)
exp_timespan = (18 + 17) / ifc.DAYS_PER_YEAR
self.assert_equal(chdr[ifc.PYRATE_TIME_SPAN], exp_timespan)
exp_date = date(2009, 7, 13)
self.assert_equal(chdr[ifc.FIRST_DATE], exp_date)
exp_date2 = date(2009, 8, 17)
self.assert_equal(chdr[ifc.SECOND_DATE], exp_date2)
exp_wavelen = LIGHTSPEED / 5.3310040e+09
self.assert_equal(chdr[ifc.PYRATE_WAVELENGTH_METRES], exp_wavelen)
def test_fail_non_dict_header(self):
self.assertRaises(gamma.combine_headers, H0, '', self.dh, self.bh)
self.assertRaises(gamma.combine_headers, '', H0, self.dh, self.bh)
self.assertRaises(gamma.combine_headers, H0, H1, None, self.bh)
self.assertRaises(gamma.combine_headers, H0, H1, '', self.bh)
def test_fail_mismatching_wavelength(self):
self.assertRaises(gamma.combine_headers, H0, H1_ERR1, self.dh, self.bh)
def test_fail_mismatching_incidence(self):
self.assertRaises(gamma.combine_headers, H0, H1_ERR2, self.dh, self.bh)
def test_fail_same_date(self):
self.assertRaises(gamma.combine_headers, H0, H0, self.dh, self.bh)
def test_fail_bad_date_order(self):
with pytest.raises(self.err):
gamma.combine_headers(H1, H0, self.dh, self.bh)
def assertRaises(self, func, * args):
with pytest.raises(self.err):
func(* args)
ifg_glob_suffix = "*_ifg.tif"
coh_glob_suffix = "*_coh.tif"
@pytest.fixture(scope='module')
def parallel_ifgs(gamma_conf):
tdir = Path(tempfile.mkdtemp())
params_p = manipulate_test_conf(gamma_conf, tdir)
params_p[C.PARALLEL] = 1
output_conf_file = 'conf.conf'
output_conf = tdir.joinpath(output_conf_file)
pyrate.configuration.write_config_file(params=params_p, output_conf_file=output_conf)
params_p = Configuration(output_conf).__dict__
conv2tif.main(params_p)
prepifg.main(params_p)
parallel_df = list(Path(params_p[C.INTERFEROGRAM_DIR]).glob(ifg_glob_suffix))
parallel_coh_files = list(Path(params_p[C.COHERENCE_DIR]).glob(coh_glob_suffix))
p_ifgs = small_data_setup(datafiles=parallel_df + parallel_coh_files)
yield p_ifgs
@pytest.fixture(scope='module')
def series_ifgs(gamma_conf):
print('======================setup series==========================')
tdir = Path(tempfile.mkdtemp())
params_s = manipulate_test_conf(gamma_conf, tdir)
params_s[C.PARALLEL] = 0
output_conf_file = 'conf.conf'
output_conf = tdir.joinpath(output_conf_file)
pyrate.configuration.write_config_file(params=params_s, output_conf_file=output_conf)
params_s = Configuration(output_conf).__dict__
conv2tif.main(params_s)
prepifg.main(params_s)
serial_ifgs = list(Path(params_s[C.INTERFEROGRAM_DIR]).glob(ifg_glob_suffix))
coh_files = list(Path(params_s[C.COHERENCE_DIR]).glob(coh_glob_suffix))
s_ifgs = small_data_setup(datafiles=serial_ifgs + coh_files)
yield s_ifgs
def test_equality(series_ifgs, parallel_ifgs):
for s, p in zip(series_ifgs, parallel_ifgs):
np.testing.assert_array_almost_equal(s.phase_data, p.phase_data)
def test_meta_data_exists(series_ifgs, parallel_ifgs):
for i, (s, p) in enumerate(zip(series_ifgs, parallel_ifgs)):
# all metadata equal
assert s.meta_data == p.meta_data
# test that DATA_TYPE exists in metadata
assert ifc.DATA_TYPE in s.meta_data.keys()
# test that DATA_TYPE is MULTILOOKED
assert (s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED) or \
(s.meta_data[ifc.DATA_TYPE] == ifc.MULTILOOKED_COH)
assert i + 1 == 34
class TestGammaBaselineRead:
"""Tests the reading of initial and precise baselines"""
def setup_method(self):
init_path = join(GAMMA_TEST_DIR, '20160114-20160126_base_init.par')
self.init = gamma.parse_baseline_header(init_path)
prec_path = join(GAMMA_TEST_DIR, '20160114-20160126_base.par')
self.prec = gamma.parse_baseline_header(prec_path)
def test_prec_baseline_read(self):
"""Test that the Precise baseline values are being read"""
exp_i = {'BASELINE_T': -0.0000026, 'BASELINE_C': -103.7427072,
'BASELINE_N': 2.8130731, 'BASELINE_RATE_T': 0.0,
'BASELINE_RATE_C': -0.0173538,
'BASELINE_RATE_N': -0.0055098}
exp_p = {'BASELINE_T': 0.0, 'BASELINE_C': -103.8364725,
'BASELINE_N': 2.8055662, 'BASELINE_RATE_T': 0.0,
'BASELINE_RATE_C': -0.0182215,
'BASELINE_RATE_N': -0.0065402}
# Precise values are read
assert self.prec != exp_i
assert self.prec == exp_p
# Initial values are ignored
assert self.init != exp_i
assert self.init != exp_p
def test_init_baseline_read(self):
"""Test that the Initial baseline values are being read"""
exp_i = {'BASELINE_T': 0.6529765, 'BASELINE_C': -103.9065694,
'BASELINE_N': 2.9253896, 'BASELINE_RATE_T': 0.0,
'BASELINE_RATE_C': -0.0231786,
'BASELINE_RATE_N': -0.0038703}
exp_p = {'BASELINE_T': 0.0, 'BASELINE_C': 0.0,
'BASELINE_N': 0.0, 'BASELINE_RATE_T': 0.0,
'BASELINE_RATE_C': 0.0, 'BASELINE_RATE_N': 0.0}
# Precise values are ignored
assert self.prec != exp_i
assert self.prec != exp_p
# Initial values are read
assert self.init == exp_i
assert self.init != exp_p
|