#! /usr/bin/env python3 # -------------------------------------------------------------------- import sys, os, errno, re, glob, shutil, itertools, logging import subprocess as sp, time, datetime, socket, io # -------------------------------------------------------------------- import collections as cl, yaml class folded_unicode(str): pass class literal_unicode(str): pass def folded_unicode_representer(dumper, data): return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='>') def literal_unicode_representer(dumper, data): return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|') yaml.add_representer(folded_unicode , folded_unicode_representer) yaml.add_representer(literal_unicode, literal_unicode_representer) # -------------------------------------------------------------------- class Object: def __init__(self, **kw): self.__dict__.update(kw) # -------------------------------------------------------------------- class ANSIColor: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) @staticmethod def _hascolors(): if not hasattr(sys.stdout, "isatty"): return False if not sys.stdout.isatty(): return False try: import curses curses.setupterm() return curses.tigetnum("colors") > 2 except: return False @staticmethod def color(txt, color): if ANSIColor.hascolors: return "\x1b[1;%dm%s\x1b[0m" % (30+color, txt) return txt ANSIColor.hascolors = ANSIColor._hascolors() def red (txt): return ANSIColor.color(txt, ANSIColor.RED ) def green(txt): return ANSIColor.color(txt, ANSIColor.GREEN) def rcolor(txt, b): return (green if b else red)(txt) # -------------------------------------------------------------------- def _options(): import configparser as cp from optparse import OptionParser parser = OptionParser() parser.add_option( '', '--bin-args', action = 'append', metavar = 'ARGS', default = [], help = 'append ARGS to EasyCrypt command (cumulative)') parser.add_option( '', '--timeout', action = 'store', default = None, metavar = 'TIMEOUT', type = 'int', help = 'set the timeout option to pass to EasyCrypt') parser.add_option( '', '--jobs', action = 'store', default = 1, metavar = 'JOBS', type = 'int', help = 'number of maximum parallel test jobs') parser.add_option( '', '--timing', action = 'store_true', default = False, help = 'add timing statistics') parser.add_option( '', '--report', action = 'store', default = None, metavar = 'FILE', help = 'dump result to FILE') (cmdopt, args) = parser.parse_args() if len(args) < 1: parser.error('this program takes at least one argument') if cmdopt.timeout: if cmdopt.timeout <= 0: parser.error('timeout must be positive') if cmdopt.jobs <= 0: parse.error('jobs must be positive') options = Object(scenarios = dict()) options.timeout = cmdopt.timeout options.timing = cmdopt.timing options.jobs = cmdopt.jobs options.report = cmdopt.report defaults = dict(args = '', exclude = '', okdirs = '', kodirs = '') config = cp.SafeConfigParser(defaults) config.read(args[0]) def resolve_targets(names): targets = [] for name in names: if name.startswith('!'): targets = filter(lambda x : x != name[1:], targets) else: if name not in targets: targets.append(name) return targets options.bin = config.get('default', 'bin') options.args = config.get('default', 'args').split() options.targets = resolve_targets(args[1:]) if cmdopt.bin_args: options.args.extend(itertools.chain.from_iterable( \ x.split() for x in cmdopt.bin_args)) def _parse_timeout(x): m = re.search(r'^(.*):(\d+)$', x) if m is None: parse.parseerror('invalid timeout: %s' % (x,)) return (m.group(1), int(m.group(2))) for test in [x for x in config.sections() if x.startswith('test-')]: scenario = Object() scenario.args = config.get(test, 'args').split() scenario.okdirs = config.get(test, 'okdirs') scenario.kodirs = config.get(test, 'kodirs') scenario.exclude = config.get(test, 'exclude').split() options.scenarios[test[5:]] = scenario for x in options.targets: if x not in options.scenarios: parser.error('unknown scenario: %s' % (x,)) return options # -------------------------------------------------------------------- def _dump_report(config, results, out): totaltime = sum([x.time for x in results]) grouped = dict() aout = [] for result in results: grouped.setdefault(result.config.group, []).append(result) for gname, group in grouped.items(): ok = [x for x in group if x.success] ko = [x for x in group if not x.success] node = cl.OrderedDict() node['name'] = gname node['hostname'] = config.hostname node['timestamp'] = config.timestamp.isoformat() node['tests'] = len(group) node['failures'] = len(ko) node['time'] = '%.3f' % totaltime node['details'] = [] for result in group: subnode = cl.OrderedDict() name = os.path.basename(result.config.filename) name = os.path.splitext(name)[0] name = '%s (%s)' % (name, result.config.filename) subnode['name'] = name subnode['time'] = '%.3f' % (result.time,) subnode['success'] = result.success subnode['shouldpass'] = result.config.isvalid subnode['output'] = literal_unicode(result.stderr.rstrip('\r\n')) node['details'].append(subnode) aout.append(node) opts = dict(default_flow_style = None, encoding = 'utf-8') out.write(yaml.dump(aout, **opts)) # -------------------------------------------------------------------- TIMING_DIR = 'timing' def _run_test(config, options): logging.info("running ec on `%s' [valid: %s]" % \ (config.filename, config.isvalid)) timestamp = time.time() try: command = [options.bin] + options.args + config.args if options.timeout: command.extend(['-timeout', str(options.timeout)]) if options.timing: tfilename = os.path.join( TIMING_DIR, os.path.splitext(config.filename)[0] + '.stats') os.makedirs(os.path.dirname(tfilename), exist_ok = True) command.extend(['-tstats', tfilename]) command.extend([config.filename]) logging.info('command: %r' % (command,)) process = sp.Popen(command, stdout = sp.PIPE, stderr = sp.PIPE) try: out, err = process.communicate() status = process.poll() finally: try : sp.kill() except: pass except OSError as e: logging.error("cannot run `%s': %s" % (options.bin, e)) exit (1) timestamp = time.time() - timestamp success = (bool(status) != bool(config.isvalid)) out = out.decode('utf-8', errors='ignore') err = err.decode('utf-8', errors='ignore') logging.info("result for `%s': success: %s" % \ (config.filename, rcolor(success, success))) return Object(success = success , config = config , time = timestamp, stderr = err ) # -------------------------------------------------------------------- def _main(): # ------------------------------------------------------------------ options = _options() logfmt = '%(asctime)-15s - %(levelname)s - %(message)s' if options.jobs > 1: logfmt = '%s %s' % ('[%(threadName)s]', logfmt) logging.basicConfig( stream = sys.stderr, level = logging.DEBUG, format = logfmt) # ------------------------------------------------------------------ def gather(obj, scenario): logging.debug("gathering scripts in `%s'" % (obj.src,)) try: scripts = os.listdir(obj.src) except OSError as e: logging.warning("cannot scan `%s': %s" % (obj.src, e)) return [] scripts = sorted([x for x in scripts if re.search(r'\.eca?$', x)]) logging.debug("%.4d script(s) found in `%s'" % (len(scripts), obj.src)) def config(filename): fullname = os.path.join(obj.src, filename) return Object(isvalid = obj.valid, group = obj.src, args = obj.args, filename = fullname) return [config(x) for x in scripts] def is_excluded(src,excludes): for path in excludes: if path.startswith('!'): if src.startswith(path[1:]): return True elif src == path: return True return False def gather_for_scenario(scenario): def expand(dirs): def for1(x): aout = [] if x.startswith('!'): aout.append(x[1:]) for root, dnames, _ in os.walk(x[1:]): aout.extend([os.path.join(root, x) for x in dnames]) else: aout.extend(glob.glob(x)) return aout dirs = [for1(x) for x in re.split(r'\s+', dirs)] return list(itertools.chain.from_iterable(dirs)) dirs = [] dirs.extend([Object(src = x, valid = True , args = scenario.args) \ for x in expand(scenario.okdirs)]) dirs.extend([Object(src = x, valid = False, args = scenario.args) \ for x in expand(scenario.kodirs)]) dirs = [x for x in dirs if not is_excluded(x.src,scenario.exclude)] dirs = map(lambda x : gather(x, scenario), dirs) return list(itertools.chain.from_iterable(dirs)) def gatherall(): dirs = [options.scenarios[x] for x in options.targets] dirs = map(lambda x : gather_for_scenario(x), dirs) return list(itertools.chain.from_iterable(dirs)) allscripts = gatherall() logging.debug("%.4d script(s) in total" % (len(allscripts,))) # -------------------------------------------------------------------- mainconfig = Object() mainconfig.hostname = socket.gethostname() mainconfig.timestamp = datetime.datetime.utcnow() if options.jobs > 1: import concurrent.futures as futures if options.jobs > 1: with futures.ThreadPoolExecutor(options.jobs, thread_name_prefix = 'Thread') \ as executor: result = list(executor.map( lambda config : _run_test(config, options), allscripts)) else: result = [] for config in allscripts: result.append(_run_test(config, options)) errors = [x for x in result if not x.success] nerrs = len(errors) logging.info(rcolor("# of failed scripts: %d" % (nerrs,), nerrs == 0)) if errors: logging.info("--- BEGIN FAILING SCRIPTS ---") for error in errors: logging.info(error.config.filename) logging.info("--- END FAILING SCRIPTS ---") logging.critical("some tests did NOT pass") if options.report is not None: with open(options.report, 'wb') as output: _dump_report(mainconfig, result, output) exit (2 if errors else 0) # -------------------------------------------------------------------- if __name__ == '__main__': _main()