Revision ad797ce7f8c1b6f92c47e21e0cf3798c47134ad7 authored by yan on 11 September 2018, 21:32:40 UTC, committed by yan on 11 September 2018, 21:32:40 UTC
1 parent 3ff1d46
Raw File
test_binaries.py
import io
import unittest
import sys
import shutil
import tempfile
import os
import hashlib
import subprocess
import time
from manticore.binary import Elf, CGCElf

#logging.basicConfig(filename = "test.log",
#                format = "%(asctime)s: %(name)s:%(levelname)s: %(message)s",
#                level = logging.DEBUG)


class TestBinaryPackage(unittest.TestCase):
    _multiprocess_can_split_ = True

    def test_elf(self):
        filename = os.path.join(os.path.dirname(__file__), 'binaries', 'basic_linux_amd64')
        f = Elf(filename)
        self.assertTrue(
            [(4194304, 823262, 'r x', 'tests/binaries/basic_linux_amd64', 0, 823262),
             (7118520, 16112, 'rw ', 'tests/binaries/basic_linux_amd64', 827064, 7320)],
            list(f.maps())
        )
        self.assertTrue([('Running', {'EIP': 4196624})], list(f.threads()))
        self.assertIsNone(f.getInterpreter())

    def test_decree(self):
        filename = os.path.join(os.path.dirname(__file__), 'binaries', 'cadet_decree_x86')
        f = CGCElf(filename)
        self.assertTrue(
            [(134512640, 1478, 'r x', 'tests/binaries/cadet_decree_x86', 0, 1478)],
            list(f.maps())
        )
        self.assertTrue([('Running', {'EIP': 134513708})], list(f.threads()))


class IntegrationTest(unittest.TestCase):
    _multiprocess_can_split_ = True
    def setUp(self):
        # Create a temporary directory
        self.test_dir = tempfile.mkdtemp()

    def tearDown(self):
        # Remove the directory after the test
        shutil.rmtree(self.test_dir)

    def _loadVisitedSet(self, visited):

        self.assertTrue(os.path.exists(visited))
        vitems = open(visited, 'r').read().splitlines()

        vitems = [int(x[2:], 16) for x in vitems]

        return set(vitems)

    def _simple_cli_run(self, filename, contract=None, tx_limit=1):
        """
        Simply run the Manticore command line with `filename`
        :param filename: Name of file inside the `tests/binaries` directory
        :return:
        """
        dirname = os.path.dirname(__file__)
        filename = os.path.join(dirname, 'binaries', filename)
        command = ['python', '-m', 'manticore']

        if contract:
            command.append('--contract')
            command.append(contract)
        command.append('--txlimit')
        command.append(str(tx_limit))
        command.append(filename)

        subprocess.check_call(command, stdout=subprocess.PIPE)

    def _runWithTimeout(self, procargs, logfile, timeout=1200):

        with open(os.path.join(os.pardir, logfile), "w") as output:
            po = subprocess.Popen(procargs, stdout=output)
            secs_used = 0

            while po.poll() is None and secs_used < timeout:
                time.sleep(1)
                sys.stderr.write("~")
                secs_used += 1

            self.assertEqual(po.returncode, 0)
            self.assertTrue(secs_used < timeout)
            sys.stderr.write("\n")

    def testTimeout(self):
        dirname = os.path.dirname(__file__)
        filename = os.path.abspath(os.path.join(dirname, 'binaries', 'arguments_linux_amd64'))
        self.assertTrue(filename.startswith(os.getcwd()))
        filename = filename[len(os.getcwd())+1:]
        workspace = os.path.join(self.test_dir, 'workspace')
        t = time.time()
        with open(os.path.join(os.pardir, self.test_dir, 'output.log'), "w") as output:
            subprocess.check_call(['python', '-m', 'manticore',
                                '--workspace', workspace,
                                '--timeout', '1',
                                '--procs', '4',
                                filename,
                                '+++++++++'], stdout=output)

        self.assertTrue(time.time()-t < 20)

    def test_logger_verbosity(self):
        """
        Tests that default verbosity produces the expected volume of output
        """

        dirname = os.path.dirname(__file__)
        filename = os.path.join(dirname, 'binaries', 'basic_linux_amd64')
        output = subprocess.check_output(['python', '-m', 'manticore', filename])
        output_lines = output.splitlines()
        start_info = output_lines[:2]
        testcase_info = output_lines[2:-5]
        stop_info = output_lines[-5:]

        self.assertEqual(len(start_info), 2)
        self.assertEqual(len(stop_info), 5)

        for line in testcase_info:
            self.assertIn('Generated testcase', line)
    @unittest.skip('sloowww')
    def testArgumentsAssertions(self):
        dirname = os.path.dirname(__file__)
        filename = os.path.abspath(os.path.join(dirname, 'binaries', 'arguments_linux_amd64'))
        self.assertTrue(filename.startswith(os.getcwd()))
        filename = filename[len(os.getcwd())+1:]
        workspace = os.path.join(self.test_dir, 'workspace')
        assertions = os.path.join(self.test_dir, 'assertions.txt')
        open(assertions,'w').write('0x0000000000401003 ZF == 1')
        with open(os.path.join(os.pardir, self.test_dir, 'output.log'), "w") as output:
            subprocess.check_call(['python', '-m', 'manticore',
                                   '--workspace', workspace,
                                   '--proc', '4',
                                   '--assertions', assertions,
                                   filename,
                                   '+++++++++'], stdout=output)
        actual = self._loadVisitedSet(os.path.join(dirname, workspace, 'visited.txt'))
        expected = self._loadVisitedSet(os.path.join(dirname, 'reference', 'arguments_linux_amd64_visited.txt'))
        self.assertGreaterEqual(actual, expected)

    def testDecree(self):
        dirname = os.path.dirname(__file__)
        filename = os.path.abspath(os.path.join(dirname, 'binaries', 'cadet_decree_x86'))
        self.assertTrue(filename.startswith(os.getcwd()))
        filename = filename[len(os.getcwd())+1:]
        workspace = os.path.join(self.test_dir, 'workspace')
        self._runWithTimeout(['python', '-m', 'manticore',
                    '--workspace', workspace,
                    '--timeout', '20',
                    '--proc', '4',
                    '--policy', 'uncovered',
                    filename], os.path.join(self.test_dir, 'output.log'))

        actual = self._loadVisitedSet(os.path.join(dirname, workspace, 'visited.txt'))
        self.assertTrue(len(actual) > 100 )

    def test_eth_regressions(self):
        issues = [
            {'number': 676, 'contract': None, 'txlimit': 1},
            {'number': 678, 'contract': None, 'txlimit': 1},
            {'number': 701, 'contract': None, 'txlimit': 1},
            {'number': 714, 'contract': None, 'txlimit': 1},
            {'number': 735, 'contract': None, 'txlimit': 2},
            {'number': 760, 'contract': None, 'txlimit': 1},
            {'number': 780, 'contract': None, 'txlimit': 1},
            {'number': 795, 'contract': None, 'txlimit': 1},
            {'number': 799, 'contract': 'C', 'txlimit': 1},
            {'number': 807, 'contract': 'C', 'txlimit': 1},
            {'number': 808, 'contract': 'C', 'txlimit': 1},
        ]

        for issue in issues:
            self._simple_cli_run('{}.sol'.format(issue['number']),
                                 contract=issue['contract'], tx_limit=issue['txlimit'])

    def test_eth_705(self):
        # This test needs to run inside tests/binaries because the contract imports a file
        # that is in the tests/binaries dir
        dirname = os.path.dirname(__file__)
        old_cwd = os.getcwd()
        try:
            self._simple_cli_run('705.sol')
        finally:
            os.chdir(old_cwd)

    def test_basic_arm(self):
        dirname = os.path.dirname(__file__)
        filename = os.path.abspath(os.path.join(dirname, 'binaries', 'basic_linux_armv7'))
        workspace = os.path.join(self.test_dir,'workspace') 
        output = subprocess.check_output(['python', '-m', 'manticore', '--workspace', workspace, filename])

        with open(os.path.join(workspace, "test_00000000.stdout")) as f:
            self.assertIn("Message", f.read())
        with open(os.path.join(workspace, "test_00000001.stdout")) as f:
            self.assertIn("Message", f.read())

    def test_brk_regression(self):
        """
        Tests for brk behavior. Source of brk_static_amd64:

	#include <stdio.h>
	#include <unistd.h>
	#include <stdint.h>

	int main(int argc, char *argv[])
	{
	    uint8_t *p = sbrk(0);

	    int valid_at_first = (p == sbrk(16));
	    int valid_after_shift = ((p+16) == sbrk(0));
	    sbrk(-16);
	    int valid_after_reset = (p == sbrk(0));
	    sbrk(-(2<<20));
	    int valid_after_bad_brk = (p == sbrk(0));

	    if (valid_at_first && valid_after_shift 
		    && valid_after_reset && valid_after_bad_brk)
		return 0;
	    else
		return 1;
	}


        """
        dirname = os.path.dirname(__file__)
        filename = os.path.abspath(os.path.join(dirname, 'binaries/brk_static_amd64'))
        workspace = '%s/workspace' % self.test_dir
        output = subprocess.check_output(['python', '-m', 'manticore', '--workspace', workspace, filename])

        with open(os.path.join(workspace, "test_00000000.messages")) as f:
            self.assertIn("finished with exit status: 0", f.read())

if __name__ == '__main__':
    unittest.main()

back to top