https://github.com/EasyCrypt/easycrypt
Tip revision: 3f4a0bd5596888cd8d28b97687d477942187aa5f authored by Pierre-Yves Strub on 11 June 2022, 06:10:21 UTC
In loop fusion/fission, add more constraints on the epilog
In loop fusion/fission, add more constraints on the epilog
Tip revision: 3f4a0bd
runtest
#! /usr/bin/env python3
# --------------------------------------------------------------------
import abc
import asyncio
import collections
import contextlib as clib
import curses
import datetime
import glob
import itertools
import math
import multiprocessing as mp
import os
import re
import signal
import socket
import sys
import termios
import time
import yaml
# --------------------------------------------------------------------
class folded_unicode(str):
pass
class literal_unicode(str):
pass
def folded_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='>')
def literal_unicode_representer(dumper, data):
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|')
yaml.add_representer(folded_unicode , folded_unicode_representer)
yaml.add_representer(literal_unicode, literal_unicode_representer)
# --------------------------------------------------------------------
@clib.asynccontextmanager
async def awaitable(the):
try:
yield the
finally:
await the.wait()
clib.awaitable = awaitable
# --------------------------------------------------------------------
class Object:
def __init__(self, **kw):
self.__dict__.update(kw)
# --------------------------------------------------------------------
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# --------------------------------------------------------------------
def _options():
import configparser as cp
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
'', '--bin-args',
action = 'append',
metavar = 'ARGS',
default = [],
help = 'append ARGS to EasyCrypt command (cumulative)')
parser.add_option(
'', '--timeout',
action = 'store',
default = None,
metavar = 'TIMEOUT',
type = 'int',
help = 'set the timeout option to pass to EasyCrypt')
parser.add_option(
'', '--jobs',
action = 'store',
default = None,
metavar = 'JOBS',
type = 'int',
help = 'number of maximum parallel test jobs')
parser.add_option(
'', '--timing',
action = 'store_true',
default = False,
help = 'add timing statistics')
parser.add_option(
'', '--report',
action = 'store',
default = None,
metavar = 'FILE',
help = 'dump result to FILE')
(cmdopt, args) = parser.parse_args()
if len(args) < 1:
parser.error('this program takes at least one argument')
if cmdopt.timeout is not None:
if cmdopt.timeout <= 0:
parser.error('timeout must be positive')
if cmdopt.jobs is not None:
if cmdopt.jobs < 0:
parser.error('jobs must be non-negative')
options = Object(scenarios = dict())
options.timeout = cmdopt.timeout
options.timing = cmdopt.timing
options.jobs = cmdopt.jobs
options.report = cmdopt.report
defaults = dict(args = '', exclude = '', okdirs = '', kodirs = '')
config = cp.ConfigParser(defaults)
config.read(args[0])
def resolve_targets(names):
targets = []
for name in names:
if name.startswith('!'):
targets = filter(lambda x : x != name[1:], targets)
else:
if name not in targets:
targets.append(name)
return targets
options.bin = config.get('default', 'bin')
options.args = config.get('default', 'args').split()
options.targets = resolve_targets(args[1:])
if cmdopt.bin_args:
options.args.extend(itertools.chain.from_iterable( \
x.split() for x in cmdopt.bin_args))
def _parse_timeout(x):
m = re.search(r'^(.*):(\d+)$', x)
if m is None:
parse.parseerror('invalid timeout: %s' % (x,))
return (m.group(1), int(m.group(2)))
for test in [x for x in config.sections() if x.startswith('test-')]:
scenario = Object()
scenario.args = config.get(test, 'args').split()
scenario.okdirs = config.get(test, 'okdirs')
scenario.kodirs = config.get(test, 'kodirs')
scenario.exclude = config.get(test, 'exclude').split()
options.scenarios[test[5:]] = scenario
for x in options.targets:
if x not in options.scenarios:
parser.error('unknown scenario: %s' % (x,))
return options
# --------------------------------------------------------------------
class Listener(abc.ABC):
REFRESH = None
@abc.abstractmethod
def file_check_start(self, filename):
pass
@abc.abstractmethod
def file_check_progress(self, handle, status):
pass
@abc.abstractmethod
def file_check_done(self, handle, status):
pass
@abc.abstractmethod
def error(self, handle, level, msg):
pass
@abc.abstractmethod
def tick(self):
pass
@abc.abstractmethod
def start(self):
pass
# --------------------------------------------------------------------
class CurseWrapper(metaclass = Singleton):
COLOR_RED = curses.COLOR_RED
COLOR_GREEN = curses.COLOR_GREEN
COLOR_YELLOW = curses.COLOR_YELLOW
COLOR_BLUE = curses.COLOR_BLUE
COLOR_CODES = (
COLOR_RED ,
COLOR_GREEN ,
COLOR_YELLOW,
COLOR_BLUE ,
)
def __init__(self):
curses.setupterm()
self._codes = dict(
cup = curses.tigetstr('cuu1' ),
cdw = curses.tigetstr('cud1' ),
sv = curses.tigetstr('sc' ),
cr = curses.tigetstr('rc' ),
el = curses.tigetstr('el' ),
setaf = curses.tigetstr('setaf'),
sgr0 = curses.tigetstr('sgr0' ),
)
self._colors = {
x: curses.tparm(self.codes['setaf'], x) for x in self.COLOR_CODES
}
self._autoflush = 0
self._stream = sys.stdout
codes = property(lambda self : self._codes)
colors = property(lambda self : self._colors)
def acquire(self):
self._autoflush += 1
def release(self):
assert (self._autoflush > 0)
self._autoflush -= 1
if self._autoflush == 0:
self._stream.flush()
def rawwrite(self, data, flush = None):
self._stream.buffer.write(data)
if flush or self._autoflush == 0:
self._stream.flush()
def write(self, data, flush = None):
self.rawwrite(data.encode('utf-8'), flush)
def cup(self, n = 1):
self.rawwrite(self.codes['cup'] * n)
def cdown(self, n = 1):
self.rawwrite(self.codes['cdw'] * n)
def csave(self):
self.rawwrite(self.codes['sv'])
def crestore(self):
self.rawwrite(self.codes['cr'])
def clearline(self):
self.rawwrite(self.codes['el'])
def colored(self, txt, color):
icolor = self.colors[color].decode('ascii')
ocolor = self.codes['sgr0'].decode('ascii')
return f'{icolor}{txt}{ocolor}'
@clib.contextmanager
def cursor(self):
self.csave()
try:
yield
finally:
self.crestore()
def echooff(self):
fdesc = self._stream.fileno()
attrs = termios.tcgetattr(fdesc)
attrs[3] &= ~termios.ECHO
termios.tcsetattr(fdesc, termios.TCSADRAIN, attrs)
def echoon(self):
fdesc = self._stream.fileno()
attrs = termios.tcgetattr(fdesc)
attrs[3] |= termios.ECHO
termios.tcsetattr(fdesc, termios.TCSADRAIN, attrs)
# --------------------------------------------------------------------
class Gather:
@staticmethod
def is_excluded(src, excludes):
for path in excludes:
if path.startswith('!'):
if src.startswith(path[1:]):
return True
elif src == path:
return True
return False
@classmethod
def gather(cls, obj, scenario):
try:
scripts = os.listdir(obj.src)
except OSError as e:
logging.warning("cannot scan `%s': %s" % (obj.src, e))
return []
scripts = sorted([x for x in scripts if re.search(r'\.eca?$', x)])
def config(filename):
fullname = os.path.join(obj.src, filename)
return Object(isvalid = obj.valid,
group = obj.src,
args = obj.args,
filename = fullname)
return [config(x) for x in scripts]
@classmethod
def gather_for_scenario(cls, scenario):
def expand(dirs):
def for1(x):
aout = []
if x.startswith('!'):
aout.append(x[1:])
for root, dnames, _ in os.walk(x[1:]):
aout.extend([os.path.join(root, x) for x in dnames])
else:
aout.extend(glob.glob(x))
return aout
dirs = [for1(x) for x in re.split(r'\s+', dirs)]
return list(itertools.chain.from_iterable(dirs))
dirs = []
dirs.extend([Object(src = x, valid = True , args = scenario.args) \
for x in expand(scenario.okdirs)])
dirs.extend([Object(src = x, valid = False, args = scenario.args) \
for x in expand(scenario.kodirs)])
dirs = [x for x in dirs if not cls.is_excluded(x.src,scenario.exclude)]
dirs = map(lambda x : cls.gather(x, scenario), dirs)
return list(itertools.chain.from_iterable(dirs))
@classmethod
def gatherall(cls, scenarios, targets):
dirs = [scenarios[x] for x in targets]
dirs = map(lambda x : cls.gather_for_scenario(x), dirs)
return list(itertools.chain.from_iterable(dirs))
# ---------------------------------------------------------------------
def format_delta(delta, hprec=2, prec=0, subprec=0):
if prec > 0:
frac, delta = math.modf(delta)
delta, frac = int(delta), int(10 ** prec * frac)
if subprec > 0:
subprec = 10 ** prec // subprec
frac = frac // subprec * subprec
return '{1:{0}d}:{2:02d}.{4:0{3}d}'.format(
hprec, delta // 60, delta % 60, prec, frac)
return '{1:{0}d}:{2:02d}'.format(hprec, int(delta) // 60, int(delta) % 60)
# ---------------------------------------------------------------------
class MBar:
def __init__(self, ntasks):
self.bars = []
self.term = CurseWrapper()
self.ntasks = ntasks
self.stats = dict(success = 0, failure = 0, running = 0, waiting = ntasks)
self.timestamp = time.time()
with self.term.cursor():
self.term.rawwrite(self.headline())
self.term.cdown()
def headline(self):
headline = '⁕ [{}] Success: {}, Failure: {}, Running: {}, Waiting: {}'
headline = headline.format(
format_delta(time.time() - self.timestamp),
*[self.term.colored(str(self.stats[x]), c) for x, c in [
('success', CurseWrapper.COLOR_GREEN ),
('failure', CurseWrapper.COLOR_RED ),
('running', CurseWrapper.COLOR_YELLOW),
('waiting', CurseWrapper.COLOR_BLUE ),
]])
return headline.encode('utf-8')
def create(self, name):
with self:
assert (self.stats['waiting'] > 0)
self._clear_bars_display()
self.stats['waiting'] -= 1
self.stats['running'] += 1
self.bars.append(MBarLine(self, name))
self._create_bars_display()
return self.bars[-1]
def write_for(self, bar):
with self:
index = self.bars.index(bar)
txt = self.render(bar)
with self.term.cursor():
self.term.cup(len(self.bars) - index)
self.term.clearline()
self.term.rawwrite(txt)
def write(self, txt):
with self:
self._clear_bars_display()
self.term.rawwrite(('▶ ' + txt).encode('utf-8') + b'\n')
self._create_bars_display()
def log(self, mark, txt, color):
self.write('[{}] {}'.format(self.term.colored(mark, color), txt))
def remove(self, bar, success):
with self:
assert (self.stats['running'] > 0)
self._clear_bars_display()
self.stats['running'] -= 1
self.stats['success' if success else 'failure'] += 1
self.bars[:] = [x for x in self.bars if x is not bar]
self._create_bars_display()
def render(self, bar):
return '⚙ '.encode('utf-8') + bar.render()
def refresh(self):
with self:
self._clear_bars_display()
self._create_bars_display()
def __enter__(self):
self.term.acquire()
def __exit__(self, exc_type, exc_value, traceback):
self.term.release()
def _clear_bars_display(self):
for _ in range(len(self.bars)+1):
self.term.cup()
self.term.clearline()
def _create_bars_display(self):
with self.term.cursor():
self.term.rawwrite(self.headline())
self.term.cdown()
for bar in self.bars:
with self.term.cursor():
self.term.rawwrite(self.render(bar))
self.term.cdown()
# --------------------------------------------------------------------
class MBarLine:
BLACK = '█'
BLANK = '-'
WIDTH = 50
def __init__(self, mbar, name):
self.mbar = mbar
self.value = 0.0
self.name = name
self.now = time.time()
self.last = time.time()
def update(self, value):
self.value = max(min(float(value), 1.0), 0.0)
self.last = time.time()
self.mbar.write_for(self)
def render(self):
wtg = time.time() - self.last
wtg = ' ' * 7 if wtg < 1.0 else ('+' + format_delta(wtg, hprec=0, prec=1))
bar = round(self.WIDTH * self.value)
bar = (self.BLACK * bar) + (self.BLANK * (self.WIDTH - bar))
bar = '[{:6.2f}%] {} [{} {}] [{}]'.format(
100 * self.value, bar,
format_delta(self.last - self.now), wtg, self.name)
return bar.encode('utf-8')
def finish(self, success):
self.mbar.remove(self, success)
# --------------------------------------------------------------------
MARKS = {
0:
('✓', CurseWrapper.COLOR_GREEN),
-signal.SIGINT:
('ϟ', CurseWrapper.COLOR_YELLOW),
-signal.SIGTERM:
('ϟ', CurseWrapper.COLOR_YELLOW),
None:
('✗', CurseWrapper.COLOR_RED),
}
# --------------------------------------------------------------------
LOGMARKS = {
'critical':
('✗', CurseWrapper.COLOR_RED),
'warning':
('ϟ', CurseWrapper.COLOR_YELLOW),
'info':
('ⓘ', CurseWrapper.COLOR_BLUE),
'debug':
('ⓘ', CurseWrapper.COLOR_BLUE),
None:
('✗', CurseWrapper.COLOR_RED),
}
# --------------------------------------------------------------------
class TermListener(Listener):
REFRESH = 0.1
def __init__(self, allscripts):
self._mbar = MBar(len(allscripts))
self._handles = {}
def file_check_start(self, filename):
handle = Object(
filename = filename,
timestamp = time.time(),
bar = self._mbar.create(filename),
)
self._handles[id(handle)] = handle
return id(handle)
def file_check_progress(self, handle, progress):
handle = self._handles[handle]
handle.bar.update(progress)
def file_check_done(self, handle, status, success):
handle = self._handles[handle]
mark, color = MARKS.get(status, MARKS[None])
self._mbar.log(mark, '[{}] {}'.format(
format_delta(time.time() - handle.timestamp, prec=1), handle.filename
), color)
handle.bar.finish(success = success)
del self._handles[id(handle)]
def tick(self):
self._mbar.refresh()
def error(self, handle, level, msg):
handle = self._handles[handle]
mark, color = LOGMARKS.get(level, LOGMARKS[None])
self._mbar.log(mark, '{}: {}'.format(handle.filename, msg), color)
@clib.contextmanager
def start(self):
self._mbar.term.echooff()
yield
self._mbar.term.echoon()
# --------------------------------------------------------------------
class RawListener(Listener):
def __init__(self, allscripts):
self._handles = {}
self._total = len(allscripts)
self._checked = 0
self.log(f'[ ] number of files to check: {self._total}')
def log(self, msg):
sys.stdout.write(msg + '\n')
sys.stdout.flush()
def file_check_start(self, filename):
handle = Object(
filename = filename,
timestamp = time.time(),
)
self._handles[id(handle)] = handle
return id(handle)
def file_check_progress(self, handle, progress):
pass
def file_check_done(self, handle, status, success):
handle = self._handles[handle]
self._checked += 1
mark, _ = MARKS.get(status, MARKS[None])
delta = format_delta(time.time() - handle.timestamp, prec=1)
self.log(f'[{mark}] [{self._checked:04d}/{self._total:04d}] [{delta}] {handle.filename}')
del self._handles[id(handle)]
def error(self, handle, level, msg):
handle, (mark, _) = self._handles[handle], LOGMARKS.get(level, LOGMARKS[None])
self.log('[{}] {}: {}'.format(mark, handle.filename, msg))
def tick(self):
pass
@clib.contextmanager
def start(self):
yield
# --------------------------------------------------------------------
async def _run_all(options, allscripts, listener : Listener):
semaphore = asyncio.Semaphore(options.jobs or mp.cpu_count())
async def runcheck(config):
async with semaphore:
command = [options.bin] + options.args + config.args
if options.timeout:
command.extend(['-timeout', str(options.timeout)])
if options.timing:
tfilename = os.path.join(
TIMING_DIR, os.path.splitext(config.filename)[0] + '.stats')
os.makedirs(os.path.dirname(tfilename), exist_ok = True)
command.extend(['-tstats', tfilename])
handle = listener.file_check_start(config.filename)
command.extend(['-script', '-no-eco', config.filename])
args = dict(
stdout = asyncio.subprocess.DEVNULL,
stderr = asyncio.subprocess.PIPE,
)
time0 = time.time()
proc = await asyncio.create_subprocess_exec(*command, **args)
errors = []
try:
async with clib.awaitable(proc) as proc:
async for line in proc.stderr:
if re.match(rb'P\b', line):
listener.file_check_progress(handle, float(line.split()[3]))
elif re.match(rb'E\b', line):
error = line[1:].decode('utf-8').strip()
error = error.split(None, 1)
if not error:
continue
if len(error) < 2:
error = [None, error]
listener.error(handle, *error)
errors.append(error)
finally:
try:
status = await asyncio.wait_for(proc.wait(), 2.0)
except asyncio.TimeoutError:
proc.kill(); status = await proc.wait()
success = (bool(status) != bool(config.isvalid))
listener.file_check_done(handle, status, success)
return Object(success = success,
config = config ,
errors = errors ,
duration = time.time() - time0)
with listener.start():
log = []
jobs = [asyncio.ensure_future(runcheck(cfg)) for cfg in allscripts]
while jobs:
done, jobs = await asyncio.wait(
jobs,
timeout = listener.REFRESH,
return_when = asyncio.FIRST_COMPLETED)
listener.tick()
for aout in done:
try:
log.append(aout.result())
except KeyboardInterrupt:
pass
return log
# --------------------------------------------------------------------
def _dump_report(config, results, stream):
duration = sum([x.duration for x in results])
grouped = dict()
aout = []
for result in results:
grouped.setdefault(result.config.group, []).append(result)
for gname, group in grouped.items():
ok = [x for x in group if x.success]
ko = [x for x in group if not x.success]
node = {}
node['name'] = gname
node['hostname'] = config.hostname
node['timestamp'] = config.timestamp.isoformat()
node['tests'] = len(group)
node['failures'] = len(ko)
node['time'] = '%.3f' % duration
node['details'] = []
for result in group:
subnode = {}
name = os.path.basename(result.config.filename)
name = os.path.splitext(name)[0]
name = '%s (%s)' % (name, result.config.filename)
subnode['name'] = name
subnode['time'] = '%.3f' % (result.duration,)
subnode['success'] = result.success
subnode['shouldpass'] = result.config.isvalid
subnode['errors'] = [literal_unicode(x) for x in result.errors]
node['details'].append(subnode)
aout.append(node)
opts = dict(
default_flow_style = None ,
encoding = 'utf-8',
sort_keys = False ,
)
stream.write(yaml.dump(aout, **opts))
# --------------------------------------------------------------------
async def _main():
mainconfig = Object()
mainconfig.hostname = socket.gethostname()
mainconfig.timestamp = datetime.datetime.utcnow()
options = _options()
allscripts = Gather.gatherall(options.scenarios, options.targets)
listener = None
if sys.stdout.isatty:
try:
listener = TermListener(allscripts)
except curses.error:
pass
if listener is None:
listener = RawListener(allscripts)
log = await _run_all(options, allscripts, listener)
if options.report is not None:
with open(options.report, 'wb') as output:
_dump_report(mainconfig, log, output)
haserrors = any(not x.success for x in log)
exit(2 if haserrors else 0)
# --------------------------------------------------------------------
if __name__ == '__main__':
try:
asyncio.run(_main())
except KeyboardInterrupt:
pass