https://github.com/JasonGross/coq-tools
Raw File
Tip revision: a47ed37e05cc2f848d43cb70a627db5c518ba0c0 authored by Jason Gross on 12 July 2022, 18:21:59 UTC
Remove path sensitivity in traceback output, test 12
Tip revision: a47ed37
diagnose_error.py
from __future__ import with_statement, print_function
import os, sys, tempfile, subprocess, re, time, math, glob, threading, shutil, traceback
from Popen_noblock import Popen_async, Empty
from memoize import memoize
from file_util import clean_v_file
from util import re_escape
from custom_arguments import LOG_ALWAYS
from coq_version import group_coq_args, get_coqc_help
import util

__all__ = ["has_error", "get_error_line_number", "get_error_byte_locations", "make_reg_string", "get_coq_output", "get_error_string", "get_timeout", "reset_timeout", "reset_coq_output_cache", "is_timeout"]

ACTUAL_ERROR_REG_STRING = '(?!Warning)(?!The command has indeed failed with message:)' # maybe we should just require Error or Timeout?
DEFAULT_PRE_PRE_ERROR_REG_STRING = 'File "[^"]+", line ([0-9]+), characters [0-9-]+:\n'
DEFAULT_PRE_ERROR_REG_STRING = DEFAULT_PRE_PRE_ERROR_REG_STRING + ACTUAL_ERROR_REG_STRING
DEFAULT_PRE_ERROR_REG_STRING_WITH_BYTES = 'File "[^"]+", line ([0-9]+), characters ([0-9]+)-([0-9]+):\n' + ACTUAL_ERROR_REG_STRING
DEFAULT_ERROR_REG_STRING = DEFAULT_PRE_ERROR_REG_STRING + '((?:.|\n)+)'
DEFAULT_ERROR_REG_STRING_WITH_BYTES = DEFAULT_PRE_ERROR_REG_STRING_WITH_BYTES + '((?:.|\n)+)'
DEFAULT_ERROR_REG_STRING_GENERIC = DEFAULT_PRE_PRE_ERROR_REG_STRING + '(%s)'

def clean_output(output):
    return util.normalize_newlines(output)

@memoize
def get_coq_accepts_fine_grained_debug(coqc, debug_kind):
    temp_file = tempfile.NamedTemporaryFile(suffix='.v', dir='.', delete=True)
    temp_file_name = temp_file.name
    p = subprocess.Popen([coqc, "-q", "-d", debug_kind, temp_file_name], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
    (stdout, stderr) = p.communicate()
    temp_file.close()
    clean_v_file(temp_file_name)
    return 'Unknown option -d' not in util.s(stdout) and '-d: no such file or directory' not in util.s(stdout) and 'There is no debug flag' not in util.s(stdout)

def get_coq_debug_native_compiler_args(coqc):
    if get_coq_accepts_fine_grained_debug(coqc, "native-compiler"): return ["-d", "native-compiler"]
    return ["-debug"]

@memoize
def get_error_match(output, reg_string=DEFAULT_ERROR_REG_STRING, pre_reg_string=DEFAULT_PRE_ERROR_REG_STRING):
    """Returns the final match of reg_string"""
    locations = [0] + [m.start() for m in re.finditer(pre_reg_string, output)]
    reg = re.compile(reg_string)
    results = (reg.search(output[start_loc:]) for start_loc in reversed(locations))
    for result in results:
        if result: return result
    return None

@memoize
def has_error(output, reg_string=DEFAULT_ERROR_REG_STRING, pre_reg_string=DEFAULT_PRE_ERROR_REG_STRING):
    """Returns True if the coq output encoded in output has an error
    matching the given regular expression, False otherwise.
    """
    errors = get_error_match(output, reg_string=reg_string, pre_reg_string=pre_reg_string)
    if errors:
        return True
    else:
        return False

TIMEOUT_POSTFIX = '\nTimeout! (external)'

@memoize
def is_timeout(output):
    """Returns True if the output was killed with a timeout, False otherwise"""
    return output.endswith(TIMEOUT_POSTFIX)

@memoize
def get_error_line_number(output, reg_string=DEFAULT_ERROR_REG_STRING, pre_reg_string=DEFAULT_PRE_ERROR_REG_STRING):
    """Returns the line number that the error matching reg_string
    occured on.

    Precondition: has_error(output, reg_string)
    """
    errors = get_error_match(output, reg_string=reg_string, pre_reg_string=pre_reg_string)
    return int(errors.groups()[0])

@memoize
def get_error_byte_locations(output, reg_string=DEFAULT_ERROR_REG_STRING_WITH_BYTES, pre_reg_string=DEFAULT_PRE_ERROR_REG_STRING_WITH_BYTES):
    """Returns the byte locations that the error matching reg_string
    occured on.

    Precondition: has_error(output, reg_string)
    """
    errors = get_error_match(output, reg_string=reg_string, pre_reg_string=pre_reg_string)
    return (int(errors.groups()[1]), int(errors.groups()[2]))

@memoize
def get_error_string(output, reg_string=DEFAULT_ERROR_REG_STRING, pre_reg_string=DEFAULT_PRE_ERROR_REG_STRING):
    """Returns the error string of the error matching reg_string.

    Precondition: has_error(output, reg_string)
    """
    errors = get_error_match(output, reg_string=reg_string, pre_reg_string=pre_reg_string)
    return errors.groups()[1]

@memoize
def make_reg_string(output, strict_whitespace=False):
    """Returns a regular expression for matching the particular error
    in output.

    Precondition: has_error(output)
    """
    unstrictify_whitespace = (lambda s: s)
    if not strict_whitespace:
        unstrictify_whitespace = (lambda s: re.sub(r'(?:\\s\\+)+', r'\\s+', re.sub(r'(?:\\ )+', r'\\s+', re.sub(r'(\\n|\n)(?:\\ )+', r'\\s+', s.replace('\\\n', '\n')))).replace('\n', '\s').replace(r'\s+\s', r'\s+').replace(r'\s++', r'\s+'))

    error_string = get_error_string(output).strip()
    if not util.PY3: error_string = error_string.decode('utf-8')
    if 'Anomaly' in error_string and re.search(r'Constant [^\s]+\s+does not appear in the environment', error_string):
        re_string = re.sub(r'(Constant\\ )[^\s]+(\\ )',
                           r'\1[^\\s]+\2',
                           re_escape(error_string))
    elif 'Anomaly' in error_string and re.search(r'Universe [^\s]+ undefined', error_string):
        re_string = re.sub(r'(Universe\\ )[^\s]+(\\ )',
                           r'\1[^\\s]+\2',
                           re_escape(error_string))
    elif 'Universe inconsistency' in error_string or 'universe inconsistency' in error_string:
        re_string = re.sub(r'([Uu]niverse\\ inconsistency.*) because(.|\n)*',
                           r'\1 because.*',
                           re_escape(error_string))
        re_string = re.sub(r'(\s)[^\s]+?\.([0-9]+)',
                           r'\1[^\\s]+?\\.\2',
                           re_string)
    elif 'Unsatisfied constraints' in error_string:
        re_string = re.sub(r'Error:\\ Unsatisfied\\ constraints:.*(?:\n.+)*.*\\\(maybe\\ a\\ bugged\\ tactic\\\)',
                           r'Error:\\ Unsatisfied\\ constraints:.*(?:\\n.+)*.*\\(maybe\\ a\\ bugged\\ tactic\\)',
                           re_escape(error_string),
                           re.DOTALL)
    elif re.search(r'Universe [^ ]* is unbound', error_string):
        re_string = re.sub(r'Universe\\ [^ ]*\\ is\\ unbound',
                           r'Universe\\ [^ ]*\\ is\\ unbound',
                           re_escape(error_string))
    else:
        re_string = re_escape(error_string)
    re_string = re.sub(r'tmp(?:[A-Za-z_\d]|\\_)+',
                       r'[A-Za-z_\\d\\.]+',
                       re_string)
    if r'Universe\ instance\ should\ have\ length\ ' not in re_string:
        re_string = re.sub(r'[\d]+',
                           r'[\\d]+',
                           re_string)
    ret = DEFAULT_ERROR_REG_STRING_GENERIC % unstrictify_whitespace(re_string)
    if not util.PY3: ret = ret.encode('utf-8')
    return ret

TIMEOUT = {}

def get_timeout(coqc=None):
    return TIMEOUT.get(coqc)

def set_timeout(key, value, **kwargs):
    kwargs['log']('\nThe timeout for %s has been set to: %d' % (key, value))
    TIMEOUT[key] = value

def reset_timeout():
    global TIMEOUT
    TIMEOUT = {}

def timeout_Popen_communicate(log, *args, **kwargs):
    ret = { 'value' : ('', ''), 'returncode': None }
    timeout = kwargs.get('timeout')
    del kwargs['timeout']
    input_val = kwargs.get('input')
    if input_val is not None: input_val = input_val.encode('utf-8')
    del kwargs['input']
    p = subprocess.Popen(*args, **kwargs)

    def target():
        ret['value'] = tuple(map(util.s, p.communicate(input=input_val)))
        ret['returncode'] = p.returncode

    thread = threading.Thread(target=target)
    thread.start()

    thread.join(timeout)
    if not thread.is_alive():
        return (ret['value'], ret['returncode'])

    p.terminate()
    thread.join()
    return (tuple(map((lambda s: (s if s else '') + TIMEOUT_POSTFIX), ret['value'])), ret['returncode'])


def memory_robust_timeout_Popen_communicate(log, *args, **kwargs):
    while True:
        try:
            return timeout_Popen_communicate(log, *args, **kwargs)
        except OSError as e:
            log('Warning: subprocess.Popen%s%s failed with %s\nTrying again in 10s' % (repr(tuple(args)), repr(kwargs), repr(e)), force_stdout=True, level=LOG_ALWAYS)
            time.sleep(10)

COQ_OUTPUT = {}

def sanitize_cmd(cmd):
    return re.sub(r'("/tmp/tmp)[^/"]*?((?:/[^"]*?)?(?:\.v)?")', r'\1XXXXXXXX\2', cmd)

def get_filepath_of_coq_args(coqc_prog, coqc_prog_args, **kwargs):
    coqc_help = get_coqc_help(coqc_prog, **kwargs)
    for arg_group in group_coq_args(coqc_prog_args, coqc_help):
        if arg_group[0] == '-top':
            ret = arg_group[1].split('.')
            return ret[:-1], ret[-1] + '.v'
    return None, None

prepare_cmds_for_coq_output_printed_cmd_already = set()
def prepare_cmds_for_coq_output(coqc_prog, coqc_prog_args, contents, cwd=None, timeout_val=0, **kwargs):
    def make_rmtree_onerror(file_name):
        def rmtree_onerror(function, path, exc_info):
            kwargs['log']('Non-fatal error encountered when cleaning up %s:\n' % file_name)
            etype, value, tb = exc_info
            if hasattr(traceback, 'TracebackException'):
                kwargs['log'](''.join(traceback.TracebackException(type(value), value, tb, capture_locals=True).format()))
            else:
                kwargs['log'](traceback.format_exception(type(value), value, tb))
        return rmtree_onerror

    key = (coqc_prog, tuple(coqc_prog_args), kwargs['pass_on_stdin'], contents, timeout_val, cwd)
    cmds = [coqc_prog] + list(coqc_prog_args)
    if key in COQ_OUTPUT.keys():
        file_name = COQ_OUTPUT[key][0]
        cleaner = lambda: None
    else:
        intermediate_dirs, topfilename = get_filepath_of_coq_args(coqc_prog, coqc_prog_args, **kwargs)
        if topfilename is None:
            with tempfile.NamedTemporaryFile(suffix='.v', delete=False, mode='wb') as f:
                f.write(contents.encode('utf-8'))
            file_name = f.name
            cleaner = lambda: clean_v_file(file_name)
        else:
            temp_dir_name = tempfile.mkdtemp()
            file_path = os.path.join(temp_dir_name, *intermediate_dirs)
            if not os.path.exists(file_path): os.makedirs(file_path)
            file_name = os.path.join(file_path, topfilename)
            with open(file_name, mode='wb') as f:
                f.write(contents.encode('utf-8'))
            cleaner = lambda: shutil.rmtree(temp_dir_name, onerror=make_rmtree_onerror(file_name))
            cmds.extend(['-R', temp_dir_name, '']) # make sure we bind the entire tree; use -R for compat with 8.4

    file_name_root = os.path.splitext(file_name)[0]

    pseudocmds = ''
    input_val = None
    if kwargs['is_coqtop']:
        if kwargs['pass_on_stdin']:
            input_val = contents
            cmds.extend(['-q'])
            pseudocmds = '" < "%s' % file_name
        else:
            cmds.extend(['-load-vernac-source', file_name_root, '-q'])
    else:
        cmds.extend([file_name, '-q'])
    cmd_to_print = '"%s%s"' % ('" "'.join(cmds), pseudocmds)
    kwargs['log']('\nRunning command: %s' % cmd_to_print,
                  level=(kwargs['verbose_base']
                         - (1 if sanitize_cmd(cmd_to_print) not in prepare_cmds_for_coq_output_printed_cmd_already else 0)))
    prepare_cmds_for_coq_output_printed_cmd_already.add(sanitize_cmd(cmd_to_print))
    kwargs['log']('\nContents:\n%s\n' % contents,
                  level=kwargs['verbose_base']+1)
    return key, file_name, cmds, input_val, cleaner

def reset_coq_output_cache(coqc_prog, coqc_prog_args, contents, timeout_val, cwd=None, is_coqtop=False, pass_on_stdin=False, verbose_base=1, **kwargs):
    key, file_name, cmds, input_val, cleaner = prepare_cmds_for_coq_output(coqc_prog, coqc_prog_args, contents, cwd=cwd, timeout_val=timeout_val, is_coqtop=is_coqtop, pass_on_stdin=pass_on_stdin, verbose_base=verbose_base, **kwargs)
    cleaner()

    if key in COQ_OUTPUT.keys(): del COQ_OUTPUT[key]

def get_coq_output(coqc_prog, coqc_prog_args, contents, timeout_val, cwd=None, is_coqtop=False, pass_on_stdin=False, verbose_base=1, retry_with_debug_when=(lambda output: 'is not a compiled interface for this version of OCaml' in output), **kwargs):
    """Returns the coqc output of running through the given
    contents.  Pass timeout_val = None for no timeout."""
    if timeout_val is not None and timeout_val < 0 and get_timeout(coqc_prog) is not None:
        return get_coq_output(coqc_prog, coqc_prog_args, contents, get_timeout(coqc_prog), cwd=cwd, is_coqtop=is_coqtop, pass_on_stdin=pass_on_stdin, verbose_base=verbose_base, retry_with_debug_when=retry_with_debug_when, **kwargs)

    key, file_name, cmds, input_val, cleaner = prepare_cmds_for_coq_output(coqc_prog, coqc_prog_args, contents, cwd=cwd, timeout_val=timeout_val, is_coqtop=is_coqtop, pass_on_stdin=pass_on_stdin, verbose_base=verbose_base, **kwargs)

    if key in COQ_OUTPUT.keys():
        cleaner()
        return COQ_OUTPUT[key][1]

    start = time.time()
    ((stdout, stderr), returncode) = memory_robust_timeout_Popen_communicate(kwargs['log'], cmds, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=subprocess.PIPE, timeout=(timeout_val if timeout_val is not None and timeout_val > 0 else None), input=input_val, cwd=cwd)
    finish = time.time()
    runtime = finish - start
    kwargs['log']('\nretcode: %d\nstdout:\n%s\n\nstderr:\n%s\nruntime:\n%f\n\n' % (returncode, util.s(stdout), util.s(stderr), runtime),
                  level=verbose_base + 1)
    if get_timeout(coqc_prog) is None and timeout_val is not None:
        set_timeout(coqc_prog, 3 * max((1, int(math.ceil(finish - start)))), **kwargs)
    cleaner()
    COQ_OUTPUT[key] = (file_name, (clean_output(util.s(stdout)), tuple(cmds), returncode, runtime))
    kwargs['log']('Storing result: COQ_OUTPUT[%s]:\n%s' % (repr(key), repr(COQ_OUTPUT[key])),
                  level=verbose_base + 2)
    if retry_with_debug_when(COQ_OUTPUT[key][1][0]):
        debug_args = get_coq_debug_native_compiler_args(coqc_prog)
        kwargs['log']('Retrying with %s...' % ' '.join(debug_args),
                      level=verbose_base - 1)
        return get_coq_output(coqc_prog, list(debug_args) + list(coqc_prog_args), contents, timeout_val, cwd=cwd, is_coqtop=is_coqtop, pass_on_stdin=pass_on_stdin, verbose_base=verbose_base, retry_with_debug_when=(lambda output: False), **kwargs)
    return COQ_OUTPUT[key][1]
back to top