https://github.com/EasyCrypt/easycrypt
Tip revision: 846710a2a656834065e745d19416ebdc83158f55 authored by Benjamin Gregoire on 14 July 2019, 06:50:07 UTC
Start restructuration of the code to be able to avant mutual dependency between type and mpath
Start restructuration of the code to be able to avant mutual dependency between type and mpath
Tip revision: 846710a
runtest
#! /usr/bin/env python3
# --------------------------------------------------------------------
import sys, os, errno, re, glob, shutil, itertools, logging
import subprocess as sp, time, datetime, socket, io
# --------------------------------------------------------------------
import collections as cl, yaml
class folded_unicode(str):
pass
class literal_unicode(str):
pass
def folded_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='>')
def literal_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|')
yaml.add_representer(folded_unicode , folded_unicode_representer)
yaml.add_representer(literal_unicode, literal_unicode_representer)
# --------------------------------------------------------------------
class Object(object):
def __init__(self, **kw):
self.__dict__.update(kw)
# --------------------------------------------------------------------
class ANSIColor(object):
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
@staticmethod
def _hascolors():
if not hasattr(sys.stdout, "isatty"):
return False
if not sys.stdout.isatty():
return False
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
return False
@staticmethod
def color(txt, color):
if ANSIColor.hascolors:
return "\x1b[1;%dm%s\x1b[0m" % (30+color, txt)
return txt
ANSIColor.hascolors = ANSIColor._hascolors()
def red (txt): return ANSIColor.color(txt, ANSIColor.RED )
def green(txt): return ANSIColor.color(txt, ANSIColor.GREEN)
def rcolor(txt, b):
return (green if b else red)(txt)
# --------------------------------------------------------------------
def _options():
import configparser as cp
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
'', '--bin-args',
action = 'append',
metavar = 'ARGS',
default = [],
help = 'append ARGS to EasyCrypt command (cumulative)')
parser.add_option(
'', '--timeout',
action = 'store',
default = None,
metavar = 'TIMEOUT',
type = 'int',
help = 'set the timeout option to pass to EasyCrypt')
parser.add_option(
'', '--jobs',
action = 'store',
default = 1,
metavar = 'JOBS',
type = 'int',
help = 'number of maximum parallel test jobs')
parser.add_option(
'', '--timing',
action = 'store_true',
default = False,
help = 'add timing statistics')
parser.add_option(
'', '--report',
action = 'store',
default = None,
metavar = 'FILE',
help = 'dump result to FILE')
(cmdopt, args) = parser.parse_args()
if len(args) < 1:
parser.error('this program takes at least one argument')
if cmdopt.timeout:
if cmdopt.timeout <= 0:
parser.error('timeout must be positive')
if cmdopt.jobs <= 0:
parse.error('jobs must be positive')
options = Object(scenarios = dict())
options.timeout = cmdopt.timeout
options.timing = cmdopt.timing
options.jobs = cmdopt.jobs
options.report = cmdopt.report
defaults = dict(args = '', exclude = '', okdirs = '', kodirs = '')
config = cp.SafeConfigParser(defaults)
config.read(args[0])
def resolve_targets(names):
targets = []
for name in names:
if name.startswith('!'):
targets = filter(lambda x : x != name[1:], targets)
else:
if name not in targets:
targets.append(name)
return targets
options.bin = config.get('default', 'bin')
options.args = config.get('default', 'args').split()
options.targets = resolve_targets(args[1:])
if cmdopt.bin_args:
options.args.extend(itertools.chain.from_iterable( \
x.split() for x in cmdopt.bin_args))
def _parse_timeout(x):
m = re.search(r'^(.*):(\d+)$', x)
if m is None:
parse.parseerror('invalid timeout: %s' % (x,))
return (m.group(1), int(m.group(2)))
for test in [x for x in config.sections() if x.startswith('test-')]:
scenario = Object()
scenario.args = config.get(test, 'args').split()
scenario.okdirs = config.get(test, 'okdirs')
scenario.kodirs = config.get(test, 'kodirs')
scenario.exclude = config.get(test, 'exclude').split()
options.scenarios[test[5:]] = scenario
for x in options.targets:
if x not in options.scenarios:
parser.error('unknown scenario: %s' % (x,))
return options
# --------------------------------------------------------------------
def _dump_report(config, results, out):
totaltime = sum([x.time for x in results])
grouped = dict()
aout = []
for result in results:
grouped.setdefault(result.config.group, []).append(result)
for gname, group in grouped.items():
ok = [x for x in group if x.success]
ko = [x for x in group if not x.success]
node = cl.OrderedDict()
node['name'] = gname
node['hostname'] = config.hostname
node['timestamp'] = config.timestamp.isoformat()
node['tests'] = len(group)
node['failures'] = len(ko)
node['time'] = '%.3f' % totaltime
node['details'] = []
for result in group:
subnode = cl.OrderedDict()
name = os.path.basename(result.config.filename)
name = os.path.splitext(name)[0]
name = '%s (%s)' % (name, result.config.filename)
subnode['name'] = name
subnode['time'] = '%.3f' % (result.time,)
subnode['success'] = result.success
subnode['shouldpass'] = result.config.isvalid
subnode['output'] = literal_unicode(result.stderr.rstrip('\r\n'))
node['details'].append(subnode)
aout.append(node)
opts = dict(default_flow_style = None, encoding = 'utf-8')
out.write(yaml.dump(aout, **opts))
# --------------------------------------------------------------------
TIMING_DIR = 'timing'
def _run_test(config, options):
logging.info("running ec on `%s' [valid: %s]" % \
(config.filename, config.isvalid))
timestamp = time.time()
try:
command = [options.bin] + options.args + config.args
if options.timeout:
command.extend(['-timeout', str(options.timeout)])
if options.timing:
tfilename = os.path.join(
TIMING_DIR, os.path.splitext(config.filename)[0] + '.stats')
os.makedirs(os.path.dirname(tfilename), exist_ok = True)
command.extend(['-tstats', tfilename])
command.extend([config.filename])
logging.info('command: %r' % (command,))
process = sp.Popen(command, stdout = sp.PIPE, stderr = sp.PIPE)
try:
out, err = process.communicate()
status = process.poll()
finally:
try : sp.kill()
except: pass
except OSError as e:
logging.error("cannot run `%s': %s" % (options.bin, e))
exit (1)
timestamp = time.time() - timestamp
success = (bool(status) != bool(config.isvalid))
out = out.decode('utf-8', errors='ignore')
err = err.decode('utf-8', errors='ignore')
logging.info("result for `%s': success: %s" % \
(config.filename, rcolor(success, success)))
return Object(success = success ,
config = config ,
time = timestamp,
stderr = err )
# --------------------------------------------------------------------
def _main():
# ------------------------------------------------------------------
options = _options()
logfmt = '%(asctime)-15s - %(levelname)s - %(message)s'
if options.jobs > 1:
logfmt = '%s %s' % ('[%(threadName)s]', logfmt)
logging.basicConfig(
stream = sys.stderr,
level = logging.DEBUG,
format = logfmt)
# ------------------------------------------------------------------
def gather(obj, scenario):
logging.debug("gathering scripts in `%s'" % (obj.src,))
try:
scripts = os.listdir(obj.src)
except OSError as e:
logging.warning("cannot scan `%s': %s" % (obj.src, e))
return []
scripts = sorted([x for x in scripts if re.search(r'\.eca?$', x)])
logging.debug("%.4d script(s) found in `%s'" % (len(scripts), obj.src))
def config(filename):
fullname = os.path.join(obj.src, filename)
return Object(isvalid = obj.valid,
group = obj.src,
args = obj.args,
filename = fullname)
return [config(x) for x in scripts]
def is_excluded(src,excludes):
for path in excludes:
if path.startswith('!'):
if src.startswith(path[1:]):
return True
elif src == path:
return True
return False
def gather_for_scenario(scenario):
def expand(dirs):
def for1(x):
aout = []
if x.startswith('!'):
aout.append(x[1:])
for root, dnames, _ in os.walk(x[1:]):
aout.extend([os.path.join(root, x) for x in dnames])
else:
aout.extend(glob.glob(x))
return aout
dirs = [for1(x) for x in re.split(r'\s+', dirs)]
return list(itertools.chain.from_iterable(dirs))
dirs = []
dirs.extend([Object(src = x, valid = True , args = scenario.args) \
for x in expand(scenario.okdirs)])
dirs.extend([Object(src = x, valid = False, args = scenario.args) \
for x in expand(scenario.kodirs)])
dirs = [x for x in dirs if not is_excluded(x.src,scenario.exclude)]
dirs = map(lambda x : gather(x, scenario), dirs)
return list(itertools.chain.from_iterable(dirs))
def gatherall():
dirs = [options.scenarios[x] for x in options.targets]
dirs = map(lambda x : gather_for_scenario(x), dirs)
return list(itertools.chain.from_iterable(dirs))
allscripts = gatherall()
logging.debug("%.4d script(s) in total" % (len(allscripts,)))
# --------------------------------------------------------------------
mainconfig = Object()
mainconfig.hostname = socket.gethostname()
mainconfig.timestamp = datetime.datetime.utcnow()
if options.jobs > 1:
import concurrent.futures as futures
if options.jobs > 1:
with futures.ThreadPoolExecutor(options.jobs, thread_name_prefix = 'Thread') \
as executor:
result = list(executor.map(
lambda config : _run_test(config, options),
allscripts))
else:
result = []
for config in allscripts:
result.append(_run_test(config, options))
errors = [x for x in result if not x.success]
nerrs = len(errors)
logging.info(rcolor("# of failed scripts: %d" % (nerrs,), nerrs == 0))
if errors:
logging.info("--- BEGIN FAILING SCRIPTS ---")
for error in errors:
logging.info(error.config.filename)
logging.info("--- END FAILING SCRIPTS ---")
logging.critical("some tests did NOT pass")
if options.report is not None:
with open(options.report, 'wb') as output:
_dump_report(mainconfig, result, output)
exit (2 if errors else 0)
# --------------------------------------------------------------------
if __name__ == '__main__':
_main()