Revision 8216663a5c88968a62f67e4aa80807167efceb8d authored by Michael Tokarev on 24 April 2024, 03:03:52 UTC, committed by Michael Tokarev on 24 April 2024, 03:03:52 UTC
Signed-off-by: Michael Tokarev <mjt@tls.msk.ru>
1 parent 51da750
Raw File
runner.py
#!/usr/bin/env python3

# Tool for running fuzz tests
#
# Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import sys
import os
import signal
import subprocess
import random
import shutil
from itertools import count
import time
import getopt
import io
import resource

try:
    import json
except ImportError:
    try:
        import simplejson as json
    except ImportError:
        print("Warning: Module for JSON processing is not found.\n" \
            "'--config' and '--command' options are not supported.", file=sys.stderr)

# Backing file sizes in MB
MAX_BACKING_FILE_SIZE = 10
MIN_BACKING_FILE_SIZE = 1


def multilog(msg, *output):
    """ Write an object to all of specified file descriptors."""
    for fd in output:
        fd.write(msg)
        fd.flush()


def str_signal(sig):
    """ Convert a numeric value of a system signal to the string one
    defined by the current operational system.
    """
    for k, v in signal.__dict__.items():
        if v == sig:
            return k


def run_app(fd, q_args):
    """Start an application with specified arguments and return its exit code
    or kill signal depending on the result of execution.
    """

    class Alarm(Exception):
        """Exception for signal.alarm events."""
        pass

    def handler(*args):
        """Notify that an alarm event occurred."""
        raise Alarm

    signal.signal(signal.SIGALRM, handler)
    signal.alarm(600)
    term_signal = signal.SIGKILL
    devnull = open('/dev/null', 'r+')
    process = subprocess.Popen(q_args, stdin=devnull,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               errors='replace')
    try:
        out, err = process.communicate()
        signal.alarm(0)
        fd.write(out)
        fd.write(err)
        fd.flush()
        return process.returncode

    except Alarm:
        os.kill(process.pid, term_signal)
        fd.write('The command was terminated by timeout.\n')
        fd.flush()
        return -term_signal


class TestException(Exception):
    """Exception for errors risen by TestEnv objects."""
    pass


class TestEnv(object):

    """Test object.

    The class sets up test environment, generates backing and test images
    and executes application under tests with specified arguments and a test
    image provided.

    All logs are collected.

    The summary log will contain short descriptions and statuses of tests in
    a run.

    The test log will include application (e.g. 'qemu-img') logs besides info
    sent to the summary log.
    """

    def __init__(self, test_id, seed, work_dir, run_log,
                 cleanup=True, log_all=False):
        """Set test environment in a specified work directory.

        Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
        'QEMU_IO' environment variables.
        """
        if seed is not None:
            self.seed = seed
        else:
            self.seed = str(random.randint(0, sys.maxsize))
        random.seed(self.seed)

        self.init_path = os.getcwd()
        self.work_dir = work_dir
        self.current_dir = os.path.join(work_dir, 'test-' + test_id)
        self.qemu_img = \
            os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
        self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
        self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
                         ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
                         ['qemu-io', '$test_img', '-c', 'read $off $len'],
                         ['qemu-io', '$test_img', '-c', 'write $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'aio_read $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'aio_write $off $len'],
                         ['qemu-io', '$test_img', '-c', 'flush'],
                         ['qemu-io', '$test_img', '-c',
                          'discard $off $len'],
                         ['qemu-io', '$test_img', '-c',
                          'truncate $off']]
        for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
            self.commands.append(
                ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
                 '$test_img', 'converted_image.' + fmt])

        try:
            os.makedirs(self.current_dir)
        except OSError as e:
            print("Error: The working directory '%s' cannot be used. Reason: %s"\
                % (self.work_dir, e.strerror), file=sys.stderr)
            raise TestException
        self.log = open(os.path.join(self.current_dir, "test.log"), "w")
        self.parent_log = open(run_log, "a")
        self.failed = False
        self.cleanup = cleanup
        self.log_all = log_all

    def _create_backing_file(self):
        """Create a backing file in the current directory.

        Return a tuple of a backing file name and format.

        Format of a backing file is randomly chosen from all formats supported
        by 'qemu-img create'.
        """
        # All formats supported by the 'qemu-img create' command.
        backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
                                          'file', 'qed', 'vpc'])
        backing_file_name = 'backing_img.' + backing_file_fmt
        backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
                                           MAX_BACKING_FILE_SIZE) * (1 << 20)
        cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
                               backing_file_name, str(backing_file_size)]
        temp_log = io.StringIO()
        retcode = run_app(temp_log, cmd)
        if retcode == 0:
            temp_log.close()
            return (backing_file_name, backing_file_fmt)
        else:
            multilog("Warning: The %s backing file was not created.\n\n"
                     % backing_file_fmt, sys.stderr, self.log, self.parent_log)
            self.log.write("Log for the failure:\n" + temp_log.getvalue() +
                           '\n\n')
            temp_log.close()
            return (None, None)

    def execute(self, input_commands=None, fuzz_config=None):
        """ Execute a test.

        The method creates backing and test images, runs test app and analyzes
        its exit status. If the application was killed by a signal, the test
        is marked as failed.
        """
        if input_commands is None:
            commands = self.commands
        else:
            commands = input_commands

        os.chdir(self.current_dir)
        backing_file_name, backing_file_fmt = self._create_backing_file()
        img_size = image_generator.create_image(
            'test.img', backing_file_name, backing_file_fmt, fuzz_config)
        for item in commands:
            shutil.copy('test.img', 'copy.img')
            # 'off' and 'len' are multiple of the sector size
            sector_size = 512
            start = random.randrange(0, img_size + 1, sector_size)
            end = random.randrange(start, img_size + 1, sector_size)

            if item[0] == 'qemu-img':
                current_cmd = list(self.qemu_img)
            elif item[0] == 'qemu-io':
                current_cmd = list(self.qemu_io)
            else:
                multilog("Warning: test command '%s' is not defined.\n"
                         % item[0], sys.stderr, self.log, self.parent_log)
                continue
            # Replace all placeholders with their real values
            for v in item[1:]:
                c = (v
                     .replace('$test_img', 'copy.img')
                     .replace('$off', str(start))
                     .replace('$len', str(end - start)))
                current_cmd.append(c)

            # Log string with the test header
            test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
                           "Backing file: %s\n" \
                           % (self.seed, " ".join(current_cmd),
                              self.current_dir, backing_file_name)
            temp_log = io.StringIO()
            try:
                retcode = run_app(temp_log, current_cmd)
            except OSError as e:
                multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
                         % (test_summary, os.path.basename(current_cmd[0]),
                            e.strerror),
                         sys.stderr, self.log, self.parent_log)
                raise TestException

            if retcode < 0:
                self.log.write(temp_log.getvalue())
                multilog("%sFAIL: Test terminated by signal %s\n\n"
                         % (test_summary, str_signal(-retcode)),
                         sys.stderr, self.log, self.parent_log)
                self.failed = True
            else:
                if self.log_all:
                    self.log.write(temp_log.getvalue())
                    multilog("%sPASS: Application exited with the code " \
                             "'%d'\n\n" % (test_summary, retcode),
                             sys.stdout, self.log, self.parent_log)
            temp_log.close()
            os.remove('copy.img')

    def finish(self):
        """Restore the test environment after a test execution."""
        self.log.close()
        self.parent_log.close()
        os.chdir(self.init_path)
        if self.cleanup and not self.failed:
            shutil.rmtree(self.current_dir)

if __name__ == '__main__':

    def usage():
        print("""
        Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR

        Set up test environment in TEST_DIR and run a test in it. A module for
        test image generation should be specified via IMG_GENERATOR.

        Example:
          runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2

        Optional arguments:
          -h, --help                    display this help and exit
          -d, --duration=NUMBER         finish tests after NUMBER of seconds
          -c, --command=JSON            run tests for all commands specified in
                                        the JSON array
          -s, --seed=STRING             seed for a test image generation,
                                        by default will be generated randomly
          --config=JSON                 take fuzzer configuration from the JSON
                                        array
          -k, --keep_passed             don't remove folders of passed tests
          -v, --verbose                 log information about passed tests

        JSON:

        '--command' accepts a JSON array of commands. Each command presents
        an application under test with all its parameters as a list of strings,
        e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].

        Supported application aliases: 'qemu-img' and 'qemu-io'.

        Supported argument aliases: $test_img for the fuzzed image, $off
        for an offset, $len for length.

        Values for $off and $len will be generated based on the virtual disk
        size of the fuzzed image.

        Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
        'QEMU_IO' environment variables.

        '--config' accepts a JSON array of fields to be fuzzed, e.g.
        '[["header"], ["header", "version"]]'.

        Each of the list elements can consist of a complex image element only
        as ["header"] or ["feature_name_table"] or an exact field as
        ["header", "version"]. In the first case random portion of the element
        fields will be fuzzed, in the second one the specified field will be
        fuzzed always.

        If '--config' argument is specified, fields not listed in
        the configuration array will not be fuzzed.
        """)

    def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
                 command, fuzz_config):
        """Setup environment for one test and execute this test."""
        try:
            test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
                           log_all)
        except TestException:
            sys.exit(1)

        # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
        # block
        try:
            try:
                test.execute(command, fuzz_config)
            except TestException:
                sys.exit(1)
        finally:
            test.finish()

    def should_continue(duration, start_time):
        """Return True if a new test can be started and False otherwise."""
        current_time = int(time.time())
        return (duration is None) or (current_time - start_time < duration)

    try:
        opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
                                       ['command=', 'help', 'seed=', 'config=',
                                        'keep_passed', 'verbose', 'duration='])
    except getopt.error as e:
        print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr)
        sys.exit(1)

    command = None
    cleanup = True
    log_all = False
    seed = None
    config = None
    duration = None
    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage()
            sys.exit()
        elif opt in ('-c', '--command'):
            try:
                command = json.loads(arg)
            except (TypeError, ValueError, NameError) as e:
                print("Error: JSON array of test commands cannot be loaded.\n" \
                    "Reason: %s" % e, file=sys.stderr)
                sys.exit(1)
        elif opt in ('-k', '--keep_passed'):
            cleanup = False
        elif opt in ('-v', '--verbose'):
            log_all = True
        elif opt in ('-s', '--seed'):
            seed = arg
        elif opt in ('-d', '--duration'):
            duration = int(arg)
        elif opt == '--config':
            try:
                config = json.loads(arg)
            except (TypeError, ValueError, NameError) as e:
                print("Error: JSON array with the fuzzer configuration cannot" \
                    " be loaded\nReason: %s" % e, file=sys.stderr)
                sys.exit(1)

    if not len(args) == 2:
        print("Expected two parameters\nTry 'runner.py --help'" \
            " for more information.", file=sys.stderr)
        sys.exit(1)

    work_dir = os.path.realpath(args[0])
    # run_log is created in 'main', because multiple tests are expected to
    # log in it
    run_log = os.path.join(work_dir, 'run.log')

    # Add the path to the image generator module to sys.path
    sys.path.append(os.path.realpath(os.path.dirname(args[1])))
    # Remove a script extension from image generator module if any
    generator_name = os.path.splitext(os.path.basename(args[1]))[0]

    try:
        image_generator = __import__(generator_name)
    except ImportError as e:
        print("Error: The image generator '%s' cannot be imported.\n" \
            "Reason: %s" % (generator_name, e), file=sys.stderr)
        sys.exit(1)

    # Enable core dumps
    resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
    # If a seed is specified, only one test will be executed.
    # Otherwise runner will terminate after a keyboard interruption
    start_time = int(time.time())
    test_id = count(1)
    while should_continue(duration, start_time):
        try:
            run_test(str(next(test_id)), seed, work_dir, run_log, cleanup,
                     log_all, command, config)
        except (KeyboardInterrupt, SystemExit):
            sys.exit(1)

        if seed is not None:
            break
back to top