https://github.com/JasonGross/coq-tools
Tip revision: a47ed37e05cc2f848d43cb70a627db5c518ba0c0 authored by Jason Gross on 12 July 2022, 18:21:59 UTC
Remove path sensitivity in traceback output, test 12
Remove path sensitivity in traceback output, test 12
Tip revision: a47ed37
split_definitions_old.py
import re, time
from Popen_noblock import Popen_async, PIPE, STDOUT, Empty
__all__ = ["join_definitions", "split_statements_to_definitions"]
DEFAULT_VERBOSITY=1
def DEFAULT_LOG(text, level=DEFAULT_VERBOSITY):
if level <= DEFAULT_VERBOSITY:
print(text)
def get_all_nowait_iter(q):
try:
while True:
yield q.get_nowait()
except Empty:
pass
def get_all_nowait(q):
return ''.join(get_all_nowait_iter(q))
def get_all_semiwait_iter(q, log=DEFAULT_LOG):
def log_and_return(val):
log(val, level=5)
return val
try:
# this is blocking; TODO(jgross): Figure out how to get coqtop
# to tell us if it's finished computing
yield log_and_return(q.get(True))
while True:
yield log_and_return(q.get(True, 0.1))
except Empty:
pass
def get_all_semiwait(q, log=DEFAULT_LOG):
return ''.join(get_all_semiwait_iter(q, log=log))
def get_definitions_diff(previous_definition_string, new_definition_string):
"""Returns a triple of lists (definitions_removed,
definitions_shared, definitions_added)"""
old_definitions = [i for i in previous_definition_string.split('|') if i]
new_definitions = [i for i in new_definition_string.split('|') if i]
return (tuple(i for i in old_definitions if i not in new_definitions),
tuple(i for i in old_definitions if i in new_definitions),
tuple(i for i in new_definitions if i not in old_definitions))
#def split_coq_code_to_definitions(code, log=DEFAULT_LOG, coqtop='coqtop'):
# """Splits coq code into chunks which make up
# independent definitions/hints/etc, of the form
#
# {
# 'statements': <list of runnable statements>,
# 'statement': <entire chunk of code>,
# 'terms_defined': <tuple of terms defined by this chunk of code>,
# 'times': <a list of times for each statement>
# 'output': <a string of the response from coqtop>
# }"""
# p = Popen_async([coqtop, '-q', '-emacs', '-time'], stdout=PIPE, stderr=STDOUT, stdin=PIPE)
# time.sleep(1)
def split_statements_to_definitions(statements, log=DEFAULT_LOG, coqtop='coqtop', coqtop_args=tuple()):
"""Splits a list of statements into chunks which make up
independent definitions/hints/etc."""
p = Popen_async([coqtop, '-q', '-emacs'] + list(coqtop_args), stdout=PIPE, stderr=STDOUT, stdin=PIPE)
time.sleep(1)
prompt_reg = re.compile(r'<prompt>([^<]*?) < ([0-9]+) ([^<]*?) ([0-9]+) < ([^<]*?)</prompt>'.replace(' ', r'\s*'))
defined_reg = re.compile(r'^([^\s]+) is (?:defined|assumed)$', re.MULTILINE)
# aborted_reg = re.compile(r'^Current goal aborted$', re.MULTILINE)
# goal_reg = re.compile(r'^\s*=+\s*$', re.MULTILINE)
# goals and definitions are on stdout, prompts are on stderr
# clear stdout
get_all_semiwait(p.stdout, log=log)
# clear stderr
# get_all_nowait(p.stderr)
rtn = []
cur_definition = {}
last_definitions = '||'
cur_definition_names = '||'
for statement in statements:
if not statement.strip():
continue
log('Write: %s\n\nWait to read...' % statement, level=4)
p.stdin.write(statement + '\n\n')
p.stdin.flush()
stdout = get_all_semiwait(p.stdout, log=log)
stderr = stdout # ''.join(get_all_semiwait(p.stderr))
terms_defined = defined_reg.findall(prompt_reg.sub('', stdout))
# print((statement, stdout, terms_defined))
prompt_match = prompt_reg.search(stderr)
if not prompt_match or len(prompt_match.groups()) != 5:
if not prompt_match:
log('Likely fatal warning: I did not recognize the output from coqtop:')
log('stdout: %s\nstderr: %s' % (repr(stdout), repr(stderr)))
log("I will append the current statement (%s) to the list of definitions as-is, but I don't expect this to work." % statement)
else:
log("Crazy things are happening; the number of groups isn't what it should be (should be 5 groups):")
log("prompt_match.groups(): %s\nstdout: %s\nstderr: %s\nstatement: %s\n" % (repr(prompt_match.groups()), repr(stdout), repr(stderr), repr(statement)))
log((statement, terms_defined, cur_definition_names, cur_definition.get(cur_definition_names, [])), level=2)
if cur_definition_names.strip('|'):
cur_definition[cur_definition_names]['statements'].append(statement)
cur_definition[cur_definition_names]['terms_defined'] += terms_defined
else:
rtn.append({'statements':(statement,),
'statement':statement,
'terms_defined':tuple(terms_defined)})
else:
cur_name, line_num1, cur_definition_names, line_num2, unknown = prompt_reg.search(stderr).groups()
definitions_removed, definitions_shared, definitions_added = get_definitions_diff(last_definitions, cur_definition_names)
# first, to be on the safe side, we add the new
# definitions key to the dict, if it wasn't already there.
if cur_definition_names.strip('|') and cur_definition_names not in cur_definition:
cur_definition[cur_definition_names] = {'statements':[], 'terms_defined':[]}
log((statement, terms_defined, last_definitions, cur_definition_names, cur_definition.get(last_definitions, []), cur_definition.get(cur_definition_names, [])), level=2)
# first, we handle the case where we have just finished
# defining something. This should correspond to
# len(definitions_removed) > 0 and len(terms_defined) > 0.
# If only len(definitions_removed) > 0, then we have
# aborted something. If only len(terms_defined) > 0, then
# we have defined something with a one-liner.
if definitions_removed:
cur_definition[last_definitions]['statements'].append(statement)
cur_definition[last_definitions]['terms_defined'] += terms_defined
if cur_definition_names.strip('|'):
# we are still inside a definition. For now, we
# flatten all definitions.
#
# TODO(jgross): Come up with a better story for
# nested definitions.
cur_definition[cur_definition_names]['statements'] += cur_definition[last_definitions]['statements']
cur_definition[cur_definition_names]['terms_defined'] += cur_definition[last_definitions]['terms_defined']
del cur_definition[last_definitions]
else:
# we're at top-level, so add this as a new
# definition
rtn.append({'statements':tuple(cur_definition[last_definitions]['statements']),
'statement':'\n'.join(cur_definition[last_definitions]['statements']),
'terms_defined':tuple(cur_definition[last_definitions]['terms_defined'])})
del cur_definition[last_definitions]
#print('Adding:')
#print(rtn[-1])
elif terms_defined:
if cur_definition_names.strip('|'):
# we are still inside a definition. For now, we
# flatten all definitions.
#
# TODO(jgross): Come up with a better story for
# nested definitions.
cur_definition[cur_definition_names]['statements'].append(statement)
cur_definition[cur_definition_names]['terms_defined'] += terms_defined
else:
# we're at top level, so add this as a new
# definition
rtn.append({'statements':(statement,),
'statement':statement,
'terms_defined':tuple(terms_defined)})
# now we handle the case where we have just opened a fresh
# definition. We've already added the key to the
# dictionary.
elif definitions_added:
#print(definitions_added)
cur_definition[cur_definition_names]['statements'].append(statement)
else:
# if we're in a definition, append the statement to
# the queue, otherwise, just add it as it's own
# statement
if cur_definition_names.strip('|'):
cur_definition[cur_definition_names]['statements'].append(statement)
else:
rtn.append({'statements':(statement,),
'statement':statement,
'terms_defined':tuple()})
last_definitions = cur_definition_names
log((last_definitions, cur_definition_names), level=2)
if last_definitions.strip('||'):
rtn.append({'statements':tuple(cur_definition[cur_definition_names]['statements']),
'statement':'\n'.join(cur_definition[cur_definition_names]['statements']),
'terms_defined':tuple(cur_definition[cur_definition_names]['terms_defined'])})
del cur_definition[last_definitions]
#for i in rtn:
#print(i)
p.stdin.close()
return rtn
def join_definitions(definitions):
return '\n'.join(i['statement'] for i in definitions)