Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision e326b29679d1df47e852eba76b361aed43210666 authored by Matt Garthwaite on 03 August 2020, 23:09:55 UTC, committed by GitHub on 03 August 2020, 23:09:55 UTC
Merge pull request #283 from GeoscienceAustralia/develop
Release 0.4.3
2 parent s f914bf5 + 313483f
  • Files
  • Changes
  • 4d41853
  • /
  • pyrate
  • /
  • configuration.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:e326b29679d1df47e852eba76b361aed43210666
directory badge
swh:1:dir:1f63451f83ba78c2d2a6728e8e7f5aed29711a07
content badge
swh:1:cnt:9e120db741b9d358349c0fdd00c7515aecce22a1

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
configuration.py
#   This Python module is part of the PyRate software package.
#
#   Copyright 2020 Geoscience Australia
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
This Python module contains utilities to validate user input parameters
parsed in a PyRate configuration file.
"""
from configparser import ConfigParser
from pathlib import Path, PurePath
from typing import Union
from pyrate.constants import NO_OF_PARALLEL_PROCESSES
from pyrate.default_parameters import PYRATE_DEFAULT_CONFIGURATION
from pyrate.core.algorithm import factorise_integer
from pyrate.core.shared import extract_epochs_from_filename, InputTypes
from pyrate.core.config import parse_namelist, ConfigException, ORB_ERROR_DIR, TEMP_MLOOKED_DIR


def set_parameter_value(data_type, input_value, default_value, required, input_name):
    if len(input_value) < 1:
        input_value = None
        if required:  # pragma: no cover
            raise ValueError("A required parameter is missing value in input configuration file: " + str(input_name))

    if input_value is not None:
        if str(data_type) in "path":
            return Path(input_value)
        return data_type(input_value)
    return default_value


def validate_parameter_value(input_name, input_value, min_value=None, max_value=None, possible_values=None):
    if isinstance(input_value, PurePath):
        if not Path.exists(input_value):  # pragma: no cover
            raise ValueError("Given path: " + str(input_value) + " does not exist.")
    if input_value is not None:
        if min_value is not None:
            if input_value < min_value:  # pragma: no cover
                raise ValueError(
                    "Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value greater than or equal to " + str(min_value) + ".")
    if input_value is not None:
        if max_value is not None:
            if input_value > max_value:  # pragma: no cover
                raise ValueError(
                    "Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value less than or equal to " + str(max_value) + ".")

    if possible_values is not None:
        if input_value not in possible_values:  # pragma: no cover
            raise ValueError(
                "Invalid value for " + str(input_name) + " supplied: " + str(input_value) + ". Provide a value from: " + str(possible_values) + ".")
    return True


def validate_file_list_values(file_list, no_of_epochs):
    if file_list is None:  # pragma: no cover
        raise ValueError("No value supplied for input file list: " + str(file_list))

    files = parse_namelist(file_list)

    for f in files:
        if not Path(f).exists():  # pragma: no cover
            raise ConfigException(f"{f} does not exist")
        else:
            matches = extract_epochs_from_filename(filename_with_epochs=f)
            if len(matches) < no_of_epochs:  # pragma: no cover
                raise ConfigException(f"the number of epochs in {f} names are less the required number: {no_of_epochs}")


class MultiplePaths:
    def __init__(self, out_dir: str, file_name: str, ifglksx: int = 1, ifgcropopt: int = 1,
                 input_type: InputTypes = InputTypes.IFG,  tempdir: Union[Path, str] = TEMP_MLOOKED_DIR):
        self.input_type = input_type
        b = Path(file_name)
        if b.suffix == ".tif":
            self.unwrapped_path = None
            converted_path = b  # original file
            self.sampled_path = Path(out_dir).joinpath(
                b.stem + '_' + str(ifglksx) + "rlks_" + str(ifgcropopt) + "cr.tif")
        else:
            self.unwrapped_path = b.as_posix()
            converted_path = Path(out_dir).joinpath(
                b.stem.split('.')[0] + '_' + b.suffix[1:] + input_type.value).with_suffix('.tif')
            self.sampled_path = converted_path.with_name(
                converted_path.stem + '_' + str(ifglksx) + "rlks_" + str(ifgcropopt) + "cr.tif")
        if not isinstance(tempdir, Path):
            tempdir = Path(out_dir).joinpath(tempdir)
        self.tmp_sampled_path = tempdir.joinpath(self.sampled_path.name).as_posix()
        self.sampled_path = self.sampled_path.as_posix()
        self.converted_path = converted_path.as_posix()

    def __str__(self):  # pragma: no cover
        st = ""
        if self.unwrapped_path is not None:
            st += """\nunwrapped_path = """ + self.unwrapped_path
        else:
            st += """\nunwrapped_path = None"""
        st += """
            converted_path = """ + self.converted_path+""" 
            sampled_path = """ + self.sampled_path+"""    
            tmp_sampled_path = """ + self.tmp_sampled_path+"""
            """
        return st


class Configuration:

    def __init__(self, config_file_path):

        parser = ConfigParser()
        parser.optionxform = str
        # mimic header to fulfil the requirement for configparser
        with open(config_file_path) as stream:
            parser.read_string("[root]\n" + stream.read())

        for key, value in parser["root"].items():
            self.__dict__[key] = value

        # make output path, if not provided will error
        Path(self.outdir).mkdir(exist_ok=True, parents=True)

        # custom process sequence if 'process' section is provided in config
        if 'process' in parser and 'steps' in parser['process']:
            self.__dict__['process'] = list(filter(None, parser['process'].get('steps').splitlines()))
        else:
            self.__dict__['process'] = [
                'orbfit',
                'refphase',
                'mst',
                'apscorrect',
                'maxvar',
                'timeseries',
                'stack'
            ]

        # Validate required parameters exist.

        required = {k for k, v in PYRATE_DEFAULT_CONFIGURATION.items() if v['Required']}

        if not required.issubset(self.__dict__):  # pragma: no cover
            raise ValueError("Required configuration parameters: " + str(
                required.difference(self.__dict__)) + " are missing from input config file.")

        # handle control parameters
        for parameter_name in PYRATE_DEFAULT_CONFIGURATION:
            param_value = self.__dict__[parameter_name] if parameter_name in required or \
                                                           parameter_name in self.__dict__ else ''

            self.__dict__[parameter_name] = set_parameter_value(PYRATE_DEFAULT_CONFIGURATION[parameter_name]["DataType"],
                                                                param_value,
                                                                PYRATE_DEFAULT_CONFIGURATION[parameter_name]["DefaultValue"],
                                                                PYRATE_DEFAULT_CONFIGURATION[parameter_name]["Required"],
                                                                parameter_name)
            validate_parameter_value(parameter_name, self.__dict__[parameter_name],
                                     PYRATE_DEFAULT_CONFIGURATION[parameter_name]["MinValue"],
                                     PYRATE_DEFAULT_CONFIGURATION[parameter_name]["MaxValue"],
                                     PYRATE_DEFAULT_CONFIGURATION[parameter_name]["PossibleValues"])

        # bespoke parameter validation
        if self.refchipsize % 2 != 1:  # pragma: no cover
            if self.refchipsize - 1 > 1:
                # Configuration parameters refchipsize must be odd
                # values too large (>101) will slow down the process without significant gains in results.
                self.refchipsize = self.refchipsize - 1

        # calculate rows and cols if not supplied
        if hasattr(self, 'rows') and hasattr(self, 'cols'):
            self.rows, self.cols = int(self.rows), int(self.cols)
        else:
            self.rows, self.cols = [int(num) for num in factorise_integer(NO_OF_PARALLEL_PROCESSES)]

        # create a temporary directory if not supplied
        if not hasattr(self, 'tmpdir'):
            self.tmpdir = Path(self.outdir).joinpath("tmpdir")
        else:
            self.tmpdir = Path(self.tmpdir)

        self.tmpdir.mkdir(parents=True, exist_ok=True)

        # create orbfit error dir
        self.orb_error_dir = Path(self.outdir).joinpath(ORB_ERROR_DIR)
        self.orb_error_dir.mkdir(parents=True, exist_ok=True)

        # create temp multilooked files dir
        self.temp_mlooked_dir = Path(self.outdir).joinpath(TEMP_MLOOKED_DIR)
        self.temp_mlooked_dir.mkdir(parents=True, exist_ok=True)

        # var no longer used
        self.APS_ELEVATION_EXT = None
        self.APS_INCIDENCE_EXT = None
        self.apscorrect = 0
        self.apsmethod = 0
        self.elevationmap = None
        self.incidencemap = None

        # define parallel processes that will run
        self.NUMEXPR_MAX_THREADS = str(NO_OF_PARALLEL_PROCESSES)

        # Validate file names supplied in list exist and contain correct epochs in file names
        if self.cohfilelist is not None:
            # if self.processor != 0:  # not roipac
            validate_file_list_values(self.cohfilelist, 1)
            self.coherence_file_paths = self.__get_files_from_attr('cohfilelist', input_type=InputTypes.COH)

        self.header_file_paths = self.__get_files_from_attr('hdrfilelist', input_type=InputTypes.HEADER)

        self.interferogram_files = self.__get_files_from_attr('ifgfilelist')

        self.dem_file = MultiplePaths(self.outdir, self.demfile, self.ifglksx, self.ifgcropopt,
                                      input_type=InputTypes.DEM)

        # backward compatibility for string paths
        for key in self.__dict__:
            if isinstance(self.__dict__[key], PurePath):
                self.__dict__[key] = str(self.__dict__[key])

    def __get_files_from_attr(self, attr, input_type=InputTypes.IFG):
        val = self.__getattribute__(attr)
        files = parse_namelist(val)
        return [
            MultiplePaths(self.outdir, p, self.ifglksx, self.ifgcropopt, input_type=input_type,
                          tempdir=self.temp_mlooked_dir) for p in files
        ]


def write_config_parser_file(conf: ConfigParser, output_conf_file: Union[str, Path]):
    """replacement function for write_config_file which uses dict instead of a ConfigParser instance"""
    with open(output_conf_file, 'w') as configfile:
        conf.write(configfile)
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API