Revision e326b29679d1df47e852eba76b361aed43210666 authored by Matt Garthwaite on 03 August 2020, 23:09:55 UTC, committed by GitHub on 03 August 2020, 23:09:55 UTC
Release 0.4.3
configuration.py
# This Python module is part of the PyRate software package.
#
# Copyright 2020 Geoscience Australia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This Python module contains utilities to validate user input parameters
parsed in a PyRate configuration file.
"""
from configparser import ConfigParser
from pathlib import Path, PurePath
from typing import Union
from pyrate.constants import NO_OF_PARALLEL_PROCESSES
from pyrate.default_parameters import PYRATE_DEFAULT_CONFIGURATION
from pyrate.core.algorithm import factorise_integer
from pyrate.core.shared import extract_epochs_from_filename, InputTypes
from pyrate.core.config import parse_namelist, ConfigException, ORB_ERROR_DIR, TEMP_MLOOKED_DIR
def set_parameter_value(data_type, input_value, default_value, required, input_name):
if len(input_value) < 1:
input_value = None
if required: # pragma: no cover
raise ValueError("A required parameter is missing value in input configuration file: " + str(input_name))
if input_value is not None:
if str(data_type) in "path":
return Path(input_value)
return data_type(input_value)
return default_value
def validate_parameter_value(input_name, input_value, min_value=None, max_value=None, possible_values=None):
if isinstance(input_value, PurePath):
if not Path.exists(input_value): # pragma: no cover
raise ValueError("Given path: " + str(input_value) + " does not exist.")
if input_value is not None:
if min_value is not None:
if input_value < min_value: # pragma: no cover
raise ValueError(
"Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value greater than or equal to " + str(min_value) + ".")
if input_value is not None:
if max_value is not None:
if input_value > max_value: # pragma: no cover
raise ValueError(
"Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value less than or equal to " + str(max_value) + ".")
if possible_values is not None:
if input_value not in possible_values: # pragma: no cover
raise ValueError(
"Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value from: " + str(possible_values) + ".")
return True
def validate_file_list_values(file_list, no_of_epochs):
if file_list is None: # pragma: no cover
raise ValueError("No value supplied for input file list: " + str(file_list))
files = parse_namelist(file_list)
for f in files:
if not Path(f).exists(): # pragma: no cover
raise ConfigException(f"{f} does not exist")
else:
matches = extract_epochs_from_filename(filename_with_epochs=f)
if len(matches) < no_of_epochs: # pragma: no cover
raise ConfigException(f"the number of epochs in {f} names are less the required number: {no_of_epochs}")
class MultiplePaths:
def __init__(self, out_dir: str, file_name: str, ifglksx: int = 1, ifgcropopt: int = 1,
input_type: InputTypes = InputTypes.IFG, tempdir: Union[Path, str] = TEMP_MLOOKED_DIR):
self.input_type = input_type
b = Path(file_name)
if b.suffix == ".tif":
self.unwrapped_path = None
converted_path = b # original file
self.sampled_path = Path(out_dir).joinpath(
b.stem + '_' + str(ifglksx) + "rlks_" + str(ifgcropopt) + "cr.tif")
else:
self.unwrapped_path = b.as_posix()
converted_path = Path(out_dir).joinpath(
b.stem.split('.')[0] + '_' + b.suffix[1:] + input_type.value).with_suffix('.tif')
self.sampled_path = converted_path.with_name(
converted_path.stem + '_' + str(ifglksx) + "rlks_" + str(ifgcropopt) + "cr.tif")
if not isinstance(tempdir, Path):
tempdir = Path(out_dir).joinpath(tempdir)
self.tmp_sampled_path = tempdir.joinpath(self.sampled_path.name).as_posix()
self.sampled_path = self.sampled_path.as_posix()
self.converted_path = converted_path.as_posix()
def __str__(self): # pragma: no cover
st = ""
if self.unwrapped_path is not None:
st += """\nunwrapped_path = """ + self.unwrapped_path
else:
st += """\nunwrapped_path = None"""
st += """
converted_path = """ + self.converted_path+"""
sampled_path = """ + self.sampled_path+"""
tmp_sampled_path = """ + self.tmp_sampled_path+"""
"""
return st
class Configuration:
def __init__(self, config_file_path):
parser = ConfigParser()
parser.optionxform = str
# mimic header to fulfil the requirement for configparser
with open(config_file_path) as stream:
parser.read_string("[root]\n" + stream.read())
for key, value in parser["root"].items():
self.__dict__[key] = value
# make output path, if not provided will error
Path(self.outdir).mkdir(exist_ok=True, parents=True)
# custom process sequence if 'process' section is provided in config
if 'process' in parser and 'steps' in parser['process']:
self.__dict__['process'] = list(filter(None, parser['process'].get('steps').splitlines()))
else:
self.__dict__['process'] = [
'orbfit',
'refphase',
'mst',
'apscorrect',
'maxvar',
'timeseries',
'stack'
]
# Validate required parameters exist.
required = {k for k, v in PYRATE_DEFAULT_CONFIGURATION.items() if v['Required']}
if not required.issubset(self.__dict__): # pragma: no cover
raise ValueError("Required configuration parameters: " + str(
required.difference(self.__dict__)) + " are missing from input config file.")
# handle control parameters
for parameter_name in PYRATE_DEFAULT_CONFIGURATION:
param_value = self.__dict__[parameter_name] if parameter_name in required or \
parameter_name in self.__dict__ else ''
self.__dict__[parameter_name] = set_parameter_value(PYRATE_DEFAULT_CONFIGURATION[parameter_name]["DataType"],
param_value,
PYRATE_DEFAULT_CONFIGURATION[parameter_name]["DefaultValue"],
PYRATE_DEFAULT_CONFIGURATION[parameter_name]["Required"],
parameter_name)
validate_parameter_value(parameter_name, self.__dict__[parameter_name],
PYRATE_DEFAULT_CONFIGURATION[parameter_name]["MinValue"],
PYRATE_DEFAULT_CONFIGURATION[parameter_name]["MaxValue"],
PYRATE_DEFAULT_CONFIGURATION[parameter_name]["PossibleValues"])
# bespoke parameter validation
if self.refchipsize % 2 != 1: # pragma: no cover
if self.refchipsize - 1 > 1:
# Configuration parameters refchipsize must be odd
# values too large (>101) will slow down the process without significant gains in results.
self.refchipsize = self.refchipsize - 1
# calculate rows and cols if not supplied
if hasattr(self, 'rows') and hasattr(self, 'cols'):
self.rows, self.cols = int(self.rows), int(self.cols)
else:
self.rows, self.cols = [int(num) for num in factorise_integer(NO_OF_PARALLEL_PROCESSES)]
# create a temporary directory if not supplied
if not hasattr(self, 'tmpdir'):
self.tmpdir = Path(self.outdir).joinpath("tmpdir")
else:
self.tmpdir = Path(self.tmpdir)
self.tmpdir.mkdir(parents=True, exist_ok=True)
# create orbfit error dir
self.orb_error_dir = Path(self.outdir).joinpath(ORB_ERROR_DIR)
self.orb_error_dir.mkdir(parents=True, exist_ok=True)
# create temp multilooked files dir
self.temp_mlooked_dir = Path(self.outdir).joinpath(TEMP_MLOOKED_DIR)
self.temp_mlooked_dir.mkdir(parents=True, exist_ok=True)
# var no longer used
self.APS_ELEVATION_EXT = None
self.APS_INCIDENCE_EXT = None
self.apscorrect = 0
self.apsmethod = 0
self.elevationmap = None
self.incidencemap = None
# define parallel processes that will run
self.NUMEXPR_MAX_THREADS = str(NO_OF_PARALLEL_PROCESSES)
# Validate file names supplied in list exist and contain correct epochs in file names
if self.cohfilelist is not None:
# if self.processor != 0: # not roipac
validate_file_list_values(self.cohfilelist, 1)
self.coherence_file_paths = self.__get_files_from_attr('cohfilelist', input_type=InputTypes.COH)
self.header_file_paths = self.__get_files_from_attr('hdrfilelist', input_type=InputTypes.HEADER)
self.interferogram_files = self.__get_files_from_attr('ifgfilelist')
self.dem_file = MultiplePaths(self.outdir, self.demfile, self.ifglksx, self.ifgcropopt,
input_type=InputTypes.DEM)
# backward compatibility for string paths
for key in self.__dict__:
if isinstance(self.__dict__[key], PurePath):
self.__dict__[key] = str(self.__dict__[key])
def __get_files_from_attr(self, attr, input_type=InputTypes.IFG):
val = self.__getattribute__(attr)
files = parse_namelist(val)
return [
MultiplePaths(self.outdir, p, self.ifglksx, self.ifgcropopt, input_type=input_type,
tempdir=self.temp_mlooked_dir) for p in files
]
def write_config_parser_file(conf: ConfigParser, output_conf_file: Union[str, Path]):
"""replacement function for write_config_file which uses dict instead of a ConfigParser instance"""
with open(output_conf_file, 'w') as configfile:
conf.write(configfile)

Computing file changes ...