https://github.com/EasyCrypt/easycrypt
Tip revision: 82cc4d7ad1ca93dc93773f50b053591da4d479b1 authored by Pierre-Yves Strub on 09 June 2020, 19:55:20 UTC
binomial law + basic lemmas (full / support)
binomial law + basic lemmas (full / support)
Tip revision: 82cc4d7
runtest
#! /usr/bin/env python3
# --------------------------------------------------------------------
import sys, os, errno, re, glob, shutil, itertools, logging
import subprocess as sp, time, datetime, socket, io
# --------------------------------------------------------------------
import collections as cl, yaml
class folded_unicode(str):
pass
class literal_unicode(str):
pass
def folded_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='>')
def literal_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|')
yaml.add_representer(folded_unicode , folded_unicode_representer)
yaml.add_representer(literal_unicode, literal_unicode_representer)
# --------------------------------------------------------------------
class Object:
def __init__(self, **kw):
self.__dict__.update(kw)
# --------------------------------------------------------------------
class ANSIColor:
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
@staticmethod
def _hascolors():
if not hasattr(sys.stdout, "isatty"):
return False
if not sys.stdout.isatty():
return False
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
return False
@staticmethod
def color(txt, color):
if ANSIColor.hascolors:
return "\x1b[1;%dm%s\x1b[0m" % (30+color, txt)
return txt
ANSIColor.hascolors = ANSIColor._hascolors()
def red (txt): return ANSIColor.color(txt, ANSIColor.RED )
def green(txt): return ANSIColor.color(txt, ANSIColor.GREEN)
def rcolor(txt, b):
return (green if b else red)(txt)
# --------------------------------------------------------------------
def _options():
import configparser as cp
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
'', '--bin-args',
action = 'append',
metavar = 'ARGS',
default = [],
help = 'append ARGS to EasyCrypt command (cumulative)')
parser.add_option(
'', '--timeout',
action = 'store',
default = None,
metavar = 'TIMEOUT',
type = 'int',
help = 'set the timeout option to pass to EasyCrypt')
parser.add_option(
'', '--jobs',
action = 'store',
default = 1,
metavar = 'JOBS',
type = 'int',
help = 'number of maximum parallel test jobs')
parser.add_option(
'', '--timing',
action = 'store_true',
default = False,
help = 'add timing statistics')
parser.add_option(
'', '--report',
action = 'store',
default = None,
metavar = 'FILE',
help = 'dump result to FILE')
(cmdopt, args) = parser.parse_args()
if len(args) < 1:
parser.error('this program takes at least one argument')
if cmdopt.timeout:
if cmdopt.timeout <= 0:
parser.error('timeout must be positive')
if cmdopt.jobs <= 0:
parse.error('jobs must be positive')
options = Object(scenarios = dict())
options.timeout = cmdopt.timeout
options.timing = cmdopt.timing
options.jobs = cmdopt.jobs
options.report = cmdopt.report
defaults = dict(args = '', exclude = '', okdirs = '', kodirs = '')
config = cp.SafeConfigParser(defaults)
config.read(args[0])
def resolve_targets(names):
targets = []
for name in names:
if name.startswith('!'):
targets = filter(lambda x : x != name[1:], targets)
else:
if name not in targets:
targets.append(name)
return targets
options.bin = config.get('default', 'bin')
options.args = config.get('default', 'args').split()
options.targets = resolve_targets(args[1:])
if cmdopt.bin_args:
options.args.extend(itertools.chain.from_iterable( \
x.split() for x in cmdopt.bin_args))
def _parse_timeout(x):
m = re.search(r'^(.*):(\d+)$', x)
if m is None:
parse.parseerror('invalid timeout: %s' % (x,))
return (m.group(1), int(m.group(2)))
for test in [x for x in config.sections() if x.startswith('test-')]:
scenario = Object()
scenario.args = config.get(test, 'args').split()
scenario.okdirs = config.get(test, 'okdirs')
scenario.kodirs = config.get(test, 'kodirs')
scenario.exclude = config.get(test, 'exclude').split()
options.scenarios[test[5:]] = scenario
for x in options.targets:
if x not in options.scenarios:
parser.error('unknown scenario: %s' % (x,))
return options
# --------------------------------------------------------------------
def _dump_report(config, results, out):
totaltime = sum([x.time for x in results])
grouped = dict()
aout = []
for result in results:
grouped.setdefault(result.config.group, []).append(result)
for gname, group in grouped.items():
ok = [x for x in group if x.success]
ko = [x for x in group if not x.success]
node = cl.OrderedDict()
node['name'] = gname
node['hostname'] = config.hostname
node['timestamp'] = config.timestamp.isoformat()
node['tests'] = len(group)
node['failures'] = len(ko)
node['time'] = '%.3f' % totaltime
node['details'] = []
for result in group:
subnode = cl.OrderedDict()
name = os.path.basename(result.config.filename)
name = os.path.splitext(name)[0]
name = '%s (%s)' % (name, result.config.filename)
subnode['name'] = name
subnode['time'] = '%.3f' % (result.time,)
subnode['success'] = result.success
subnode['shouldpass'] = result.config.isvalid
subnode['output'] = literal_unicode(result.stderr.rstrip('\r\n'))
node['details'].append(subnode)
aout.append(node)
opts = dict(default_flow_style = None, encoding = 'utf-8')
out.write(yaml.dump(aout, **opts))
# --------------------------------------------------------------------
TIMING_DIR = 'timing'
def _run_test(config, options):
logging.info("running ec on `%s' [valid: %s]" % \
(config.filename, config.isvalid))
timestamp = time.time()
try:
command = [options.bin] + options.args + config.args
if options.timeout:
command.extend(['-timeout', str(options.timeout)])
if options.timing:
tfilename = os.path.join(
TIMING_DIR, os.path.splitext(config.filename)[0] + '.stats')
os.makedirs(os.path.dirname(tfilename), exist_ok = True)
command.extend(['-tstats', tfilename])
command.extend([config.filename])
logging.info('command: %r' % (command,))
process = sp.Popen(command, stdout = sp.PIPE, stderr = sp.PIPE)
try:
out, err = process.communicate()
status = process.poll()
finally:
try : sp.kill()
except: pass
except OSError as e:
logging.error("cannot run `%s': %s" % (options.bin, e))
exit (1)
timestamp = time.time() - timestamp
success = (bool(status) != bool(config.isvalid))
out = out.decode('utf-8', errors='ignore')
err = err.decode('utf-8', errors='ignore')
logging.info("result for `%s': success: %s" % \
(config.filename, rcolor(success, success)))
return Object(success = success ,
config = config ,
time = timestamp,
stderr = err )
# --------------------------------------------------------------------
def _main():
# ------------------------------------------------------------------
options = _options()
logfmt = '%(asctime)-15s - %(levelname)s - %(message)s'
if options.jobs > 1:
logfmt = '%s %s' % ('[%(threadName)s]', logfmt)
logging.basicConfig(
stream = sys.stderr,
level = logging.DEBUG,
format = logfmt)
# ------------------------------------------------------------------
def gather(obj, scenario):
logging.debug("gathering scripts in `%s'" % (obj.src,))
try:
scripts = os.listdir(obj.src)
except OSError as e:
logging.warning("cannot scan `%s': %s" % (obj.src, e))
return []
scripts = sorted([x for x in scripts if re.search(r'\.eca?$', x)])
logging.debug("%.4d script(s) found in `%s'" % (len(scripts), obj.src))
def config(filename):
fullname = os.path.join(obj.src, filename)
return Object(isvalid = obj.valid,
group = obj.src,
args = obj.args,
filename = fullname)
return [config(x) for x in scripts]
def is_excluded(src,excludes):
for path in excludes:
if path.startswith('!'):
if src.startswith(path[1:]):
return True
elif src == path:
return True
return False
def gather_for_scenario(scenario):
def expand(dirs):
def for1(x):
aout = []
if x.startswith('!'):
aout.append(x[1:])
for root, dnames, _ in os.walk(x[1:]):
aout.extend([os.path.join(root, x) for x in dnames])
else:
aout.extend(glob.glob(x))
return aout
dirs = [for1(x) for x in re.split(r'\s+', dirs)]
return list(itertools.chain.from_iterable(dirs))
dirs = []
dirs.extend([Object(src = x, valid = True , args = scenario.args) \
for x in expand(scenario.okdirs)])
dirs.extend([Object(src = x, valid = False, args = scenario.args) \
for x in expand(scenario.kodirs)])
dirs = [x for x in dirs if not is_excluded(x.src,scenario.exclude)]
dirs = map(lambda x : gather(x, scenario), dirs)
return list(itertools.chain.from_iterable(dirs))
def gatherall():
dirs = [options.scenarios[x] for x in options.targets]
dirs = map(lambda x : gather_for_scenario(x), dirs)
return list(itertools.chain.from_iterable(dirs))
allscripts = gatherall()
logging.debug("%.4d script(s) in total" % (len(allscripts,)))
# --------------------------------------------------------------------
mainconfig = Object()
mainconfig.hostname = socket.gethostname()
mainconfig.timestamp = datetime.datetime.utcnow()
if options.jobs > 1:
import concurrent.futures as futures
if options.jobs > 1:
with futures.ThreadPoolExecutor(options.jobs, thread_name_prefix = 'Thread') \
as executor:
result = list(executor.map(
lambda config : _run_test(config, options),
allscripts))
else:
result = []
for config in allscripts:
result.append(_run_test(config, options))
errors = [x for x in result if not x.success]
nerrs = len(errors)
logging.info(rcolor("# of failed scripts: %d" % (nerrs,), nerrs == 0))
if errors:
logging.info("--- BEGIN FAILING SCRIPTS ---")
for error in errors:
logging.info(error.config.filename)
logging.info("--- END FAILING SCRIPTS ---")
logging.critical("some tests did NOT pass")
if options.report is not None:
with open(options.report, 'wb') as output:
_dump_report(mainconfig, result, output)
exit (2 if errors else 0)
# --------------------------------------------------------------------
if __name__ == '__main__':
_main()