Raw File
runtest
#! /usr/bin/env python3

# --------------------------------------------------------------------
import sys, os, errno, re, glob, shutil, itertools, logging
import subprocess as sp, time, datetime, socket, io

# --------------------------------------------------------------------
import collections as cl, yaml

class folded_unicode(str):
    pass

class literal_unicode(str):
    pass

def folded_unicode_representer(dumper, data):
    return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='>')
def literal_unicode_representer(dumper, data):
    return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|')

yaml.add_representer(folded_unicode , folded_unicode_representer)
yaml.add_representer(literal_unicode, literal_unicode_representer)

# --------------------------------------------------------------------
class Object:
    def __init__(self, **kw):
        self.__dict__.update(kw)

# --------------------------------------------------------------------
class ANSIColor:
    BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)

    @staticmethod
    def _hascolors():
        if not hasattr(sys.stdout, "isatty"):
            return False
        if not sys.stdout.isatty():
            return False

        try:
            import curses

            curses.setupterm()
            return curses.tigetnum("colors") > 2
        except:
            return False

    @staticmethod
    def color(txt, color):
        if ANSIColor.hascolors:
            return "\x1b[1;%dm%s\x1b[0m" % (30+color, txt)
        return txt

ANSIColor.hascolors = ANSIColor._hascolors()

def red  (txt): return ANSIColor.color(txt, ANSIColor.RED  )
def green(txt): return ANSIColor.color(txt, ANSIColor.GREEN)

def rcolor(txt, b):
    return (green if b else red)(txt)

# --------------------------------------------------------------------
def _options():
    import configparser as cp
    from optparse import OptionParser

    parser = OptionParser()

    parser.add_option(
        '', '--bin-args',
        action  = 'append',
        metavar = 'ARGS',
        default = [],
        help    = 'append ARGS to EasyCrypt command (cumulative)')

    parser.add_option(
        '', '--timeout',
        action  = 'store',
        default = None,
        metavar = 'TIMEOUT',
        type    = 'int',
        help    = 'set the timeout option to pass to EasyCrypt')

    parser.add_option(
        '', '--jobs',
        action  = 'store',
        default = 1,
        metavar = 'JOBS',
        type    = 'int',
        help    = 'number of maximum parallel test jobs')

    parser.add_option(
        '', '--timing',
        action  = 'store_true',
        default = False,
        help    = 'add timing statistics')

    parser.add_option(
        '', '--report',
        action  = 'store',
        default = None,
        metavar = 'FILE',
        help    = 'dump result to FILE')

    (cmdopt, args) = parser.parse_args()

    if len(args) < 1:
        parser.error('this program takes at least one argument')

    if cmdopt.timeout:
        if cmdopt.timeout <= 0:
            parser.error('timeout must be positive')

    if cmdopt.jobs <= 0:
        parse.error('jobs must be positive')

    options = Object(scenarios = dict())
    options.timeout  = cmdopt.timeout
    options.timing   = cmdopt.timing
    options.jobs     = cmdopt.jobs
    options.report   = cmdopt.report

    defaults = dict(args = '', exclude = '', okdirs = '', kodirs = '')

    config = cp.SafeConfigParser(defaults)
    config.read(args[0])

    def resolve_targets(names):
        targets = []
        for name in names:
            if name.startswith('!'):
                targets = filter(lambda x : x != name[1:], targets)
            else:
                if name not in targets:
                    targets.append(name)
        return targets

    options.bin     = config.get('default', 'bin')
    options.args    = config.get('default', 'args').split()
    options.targets = resolve_targets(args[1:])

    if cmdopt.bin_args:
        options.args.extend(itertools.chain.from_iterable( \
          x.split() for x in cmdopt.bin_args))

    def _parse_timeout(x):
        m = re.search(r'^(.*):(\d+)$', x)
        if m is None:
            parse.parseerror('invalid timeout: %s' % (x,))
        return (m.group(1), int(m.group(2)))

    for test in [x for x in config.sections() if x.startswith('test-')]:
        scenario = Object()
        scenario.args    = config.get(test, 'args').split()
        scenario.okdirs  = config.get(test, 'okdirs')
        scenario.kodirs  = config.get(test, 'kodirs')
        scenario.exclude = config.get(test, 'exclude').split()
        options.scenarios[test[5:]] = scenario

    for x in options.targets:
        if x not in options.scenarios:
            parser.error('unknown scenario: %s' % (x,))

    return options

# --------------------------------------------------------------------
def _dump_report(config, results, out):
    totaltime = sum([x.time for x in results])
    grouped   = dict()
    aout      = []

    for result in results:
        grouped.setdefault(result.config.group, []).append(result)

    for gname, group in grouped.items():
        ok   = [x for x in group if     x.success]
        ko   = [x for x in group if not x.success]
        node = cl.OrderedDict()

        node['name']      = gname
        node['hostname']  = config.hostname
        node['timestamp'] = config.timestamp.isoformat()
        node['tests']     = len(group)
        node['failures']  = len(ko)
        node['time']      = '%.3f' % totaltime
        node['details']   = []

        for result in group:
            subnode = cl.OrderedDict()

            name = os.path.basename(result.config.filename)
            name = os.path.splitext(name)[0]
            name = '%s (%s)' % (name, result.config.filename)

            subnode['name']       = name
            subnode['time']       = '%.3f' % (result.time,)
            subnode['success']    = result.success
            subnode['shouldpass'] = result.config.isvalid
            subnode['output']     = literal_unicode(result.stderr.rstrip('\r\n'))

            node['details'].append(subnode)

        aout.append(node)

    opts = dict(default_flow_style = None, encoding = 'utf-8')

    out.write(yaml.dump(aout, **opts))

# --------------------------------------------------------------------
TIMING_DIR = 'timing'

def _run_test(config, options):
    logging.info("running ec on `%s' [valid: %s]" % \
                     (config.filename, config.isvalid))

    timestamp = time.time()
    try:
        command = [options.bin] + options.args + config.args
        if options.timeout:
            command.extend(['-timeout', str(options.timeout)])
        if options.timing:
            tfilename = os.path.join(
                TIMING_DIR, os.path.splitext(config.filename)[0] + '.stats')
            os.makedirs(os.path.dirname(tfilename), exist_ok = True)
            command.extend(['-tstats', tfilename])
            
        command.extend([config.filename])

        logging.info('command: %r' % (command,))
        process = sp.Popen(command, stdout = sp.PIPE, stderr = sp.PIPE)

        try:
            out, err = process.communicate()
            status   = process.poll()
        finally:
            try   : sp.kill()
            except: pass

    except OSError as e:
        logging.error("cannot run `%s': %s" % (options.bin, e))
        exit (1)

    timestamp = time.time() - timestamp
    success   = (bool(status) != bool(config.isvalid))
    out       = out.decode('utf-8', errors='ignore')
    err       = err.decode('utf-8', errors='ignore')

    logging.info("result for `%s': success: %s" % \
                     (config.filename, rcolor(success, success)))

    return Object(success = success  ,
                  config  = config   ,
                  time    = timestamp,
                  stderr  = err      )

# --------------------------------------------------------------------
def _main():
    # ------------------------------------------------------------------
    options = _options()

    logfmt = '%(asctime)-15s - %(levelname)s - %(message)s'
    if options.jobs > 1:
        logfmt = '%s %s' % ('[%(threadName)s]', logfmt)
        
    logging.basicConfig(
        stream = sys.stderr,
        level  = logging.DEBUG,
        format = logfmt)

    # ------------------------------------------------------------------
    def gather(obj, scenario):
        logging.debug("gathering scripts in `%s'" % (obj.src,))
        try:
            scripts = os.listdir(obj.src)
        except OSError as e:
            logging.warning("cannot scan `%s': %s" % (obj.src, e))
            return []
        scripts = sorted([x for x in scripts if re.search(r'\.eca?$', x)])
        logging.debug("%.4d script(s) found in `%s'" % (len(scripts), obj.src))

        def config(filename):
            fullname = os.path.join(obj.src, filename)
            return Object(isvalid  = obj.valid,
                          group    = obj.src,
                          args     = obj.args,
                          filename = fullname)

        return [config(x) for x in scripts]

    def is_excluded(src,excludes):
        for path in excludes:
            if path.startswith('!'):
                if src.startswith(path[1:]):
                    return True
            elif src == path:
                return True
        return False

    def gather_for_scenario(scenario):
        def expand(dirs):
            def for1(x):
                aout = []
                if x.startswith('!'):
                    aout.append(x[1:])
                    for root, dnames, _ in os.walk(x[1:]):
                        aout.extend([os.path.join(root, x) for x in dnames])
                else:
                    aout.extend(glob.glob(x))
                return aout

            dirs = [for1(x) for x in re.split(r'\s+', dirs)]
            return list(itertools.chain.from_iterable(dirs))

        dirs = []
        dirs.extend([Object(src = x, valid = True , args = scenario.args) \
                         for x in expand(scenario.okdirs)])
        dirs.extend([Object(src = x, valid = False, args = scenario.args) \
                         for x in expand(scenario.kodirs)])
        dirs = [x for x in dirs if not is_excluded(x.src,scenario.exclude)]
        dirs = map(lambda x : gather(x, scenario), dirs)
        return list(itertools.chain.from_iterable(dirs))

    def gatherall():
        dirs = [options.scenarios[x] for x in options.targets]
        dirs = map(lambda x : gather_for_scenario(x), dirs)
        return list(itertools.chain.from_iterable(dirs))

    allscripts = gatherall()

    logging.debug("%.4d script(s) in total" % (len(allscripts,)))

    # --------------------------------------------------------------------
    mainconfig = Object()

    mainconfig.hostname  = socket.gethostname()
    mainconfig.timestamp = datetime.datetime.utcnow()

    if options.jobs > 1:
        import concurrent.futures as futures

    if options.jobs > 1:
        with futures.ThreadPoolExecutor(options.jobs, thread_name_prefix = 'Thread') \
             as executor:

            result = list(executor.map(
                lambda config : _run_test(config, options),
                allscripts))

    else:
        result = []
        for config in allscripts:
            result.append(_run_test(config, options))

    errors = [x for x in result if not x.success]
    nerrs  = len(errors)

    logging.info(rcolor("# of failed scripts: %d" % (nerrs,), nerrs == 0))
    if errors:
        logging.info("--- BEGIN FAILING SCRIPTS ---")
        for error in errors:
            logging.info(error.config.filename)
        logging.info("--- END FAILING SCRIPTS ---")
        logging.critical("some tests did NOT pass")

    if options.report is not None:
        with open(options.report, 'wb') as output:
            _dump_report(mainconfig, result, output)

    exit (2 if errors else 0)

# --------------------------------------------------------------------
if __name__ == '__main__':
    _main()
back to top