https://github.com/NobleMathews/FuzzSliceICSE
Tip revision: f7e3861d1127ca45aca7cf256ea8b39dab673095 authored by ashamedbit on 22 January 2024, 04:40:09 UTC
Uncomment speed warnings
Uncomment speed warnings
Tip revision: f7e3861
fuzz.py
import glob
import linecache
import os
import re
import shlex
import shutil
import subprocess
import sys
import threading
import time
from multiprocessing import Pool, process
from pathlib import Path
import yaml
import timeout_decorator
import psutil
from loguru import logger
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
# The time for which Fuzzer runs
timeout = config["timeout"]
# Max length of fuzz bytes
max_length_fuzz_bytes = config["max_length_fuzz_bytes"]
# Allow multiprocessing for issues
parallel_execution = config["parallel_execution"]
crash_limit = config["crash_limit"]
hard_timeout = config["hard_timeout"]
# Deprecated - please use libfuzz until this is re-enabled
class Aflfuzz:
# test case wise timeout in milliseconds
test_timeout = 1000
afl_fuzz_path = "afl-fuzz"
afl_cov_path = "afl-cov"
@staticmethod
def construct_fuzz_command(file, inpath, outpath):
command = (
"AFL_SKIP_CPUFREQ=1 "
+ Aflfuzz.afl_fuzz_path
+ " -t "
+ str(Aflfuzz.test_timeout)
+ " -i "
+ inpath
+ " -V "
+ str(Fuzzer.TIMEOUT)
+ " -C -o "
+ outpath
+ " "
+ file
)
my_env = os.environ
my_env["AFL_SKIP_CPUFREQ"] = "1"
cwd = "."
return [command, my_env, cwd]
@staticmethod
def check_crashes(print_lines, file, outpath):
if not (os.path.isfile(file[:-3] + "cov")):
print_lines += ["No ASAN compiled binary"]
print_lines += [
"===============================CRASH ANALYSIS ENDS================================="
]
return
dir = os.path.join(outpath, "crashes")
count = 0
is_confirmed = 0
for path in os.listdir(dir):
# There is a README file which needs to be not counted!
if path == "README.txt":
continue
# check if current path is a file
crashpath = os.path.join(dir, path)
if os.path.isfile(crashpath):
f = open(crashpath, "rb")
p = subprocess.Popen(
[file[:-3] + "cov"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
p.stdin.write(f.read())
p.stdin.close()
p.wait()
asan_output = p.stderr.read().decode("utf-8")
[print_lines, crash_line] = Fuzzer.identify_crash_line(
print_lines, asan_output, file
)
[print_lines, ret] = Fuzzer.confirm_crash_warning(
print_lines, file, crash_line
)
if ret == 1:
is_confirmed = 1
f.close()
count += 1
return [print_lines, is_confirmed, count]
@staticmethod
def generate_coverage(print_lines, file, outpath):
print_lines += [
"===============================COVERAGE ANALYSIS BEGINS================================="
]
coverage_bin = file[:-3] + "cov"
if not (os.path.isfile(coverage_bin)):
print_lines += ["No ASAN compiled binary"]
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return
code_dir = os.path.abspath(os.path.join(file, os.pardir))
command = (
Aflfuzz.afl_cov_path
+ " -d "
+ outpath
+ ' --code-dir . --enable-branch-coverage --overwrite --coverage-cmd "'
+ coverage_bin
+ ' < AFL_FILE"'
)
args = shlex.split(command)
p = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=code_dir
)
p.wait()
output = p.stdout.read().decode("utf-8")
if "ERROR" in output:
print_lines += [
"Coverage results did not generate. Maybe it is because of stack smashing"
]
print_lines += [args]
print_lines += [output]
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return print_lines
if "Could not find any" in output:
print_lines += ["gcov file not present"]
print_lines += [output]
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return print_lines
print_lines += ["Coverage results generated!"]
print_lines = Aflfuzz.print_coverage(print_lines, file, outpath)
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return print_lines
@staticmethod
def print_coverage(print_lines, file, outpath):
web_report_dir = os.path.join(
outpath,
"cov",
"web",
"test_files",
os.path.basename(file)[:-3] + "c.gcov.html",
)
f = open(web_report_dir, "rb")
output = f.read().decode("utf-8")
lines = output.split("\n")
results = []
for line in lines:
if '<td class="headerCovTableEntry">' in line:
m = re.findall('<td class="headerCovTableEntry">([\d\/\s]+)<', line)
if m:
results += [m[0]]
print_lines += ["The Line coverage is : " + results[0] + "/" + results[1]]
print_lines += ["The Function coverage is : " + results[2] + "/" + results[3]]
print_lines += ["The Branch coverage is : " + results[4] + "/" + results[5]]
return print_lines
class Libfuzz:
too_many_crashes = crash_limit
@staticmethod
def construct_fuzz_command(file, inpath, outpath):
command = (
file
+ " -fork=2 -ignore_crashes=1 -max_len="
+ str(Fuzzer.max_len)
+ " -detect_leaks=0 -len_control=0 -malloc_limit_mb=204800 -timeout=10 -rss_limit_mb=204800 -max_total_time="
+ str(Fuzzer.TIMEOUT)
+ " "
+ inpath
)
cwd = outpath
my_env = os.environ
my_env["LD_LIBRARY_PATH"] = os.path.abspath(
"./test_lib/" + Fuzzer.test_library + "/build_ss"
)
return [command, my_env, cwd]
@staticmethod
def check_crashes(print_lines, file, outpath):
dir = outpath
count = 0
is_confirmed = 0
for path in os.listdir(dir):
if os.path.isfile(os.path.join(dir, path)):
if path.startswith("crash-"):
if count > Libfuzz.too_many_crashes:
# Too many crashes! Stop!!
break
count = count + 1
print_lines += ["crashfile: " + path]
p = subprocess.run(
[file, os.path.join(dir, path)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
asan_output = p.stderr.decode("utf-8")
[print_lines, crash_line] = Fuzzer.identify_crash_line(
print_lines, asan_output, file
)
[print_lines, ret] = Fuzzer.confirm_crash_warning(
print_lines, file, crash_line
)
if ret == 1:
is_confirmed = 1
return [print_lines, is_confirmed, count]
@staticmethod
def print_coverage(file, print_lines, coverage_output, fuzz_data=None):
if fuzz_data is None:
fuzz_data = {}
file_root = os.path.splitext(os.path.basename(file))[0] + "."
count = 0
count_not_covered = 0
count_impossible_to_cover = 0
target_line = ""
# Assume target line is covered until later disproved
target_line_covered = 1
lines = coverage_output.split("\n")
print_lines += ["Coverage for target file : "]
print_status = 0
for line in lines:
# Print only lines from coverage within target file or else stdout is cluttered!
if (file_root in line) or print_status:
if "Unexecuted instantiation" not in line:
print_lines += [line]
print_status = 1
if line.strip() == "":
print_status = 0
match_all_lines = re.search("\d\|.*\|", line)
if match_all_lines:
count = count + 1
match_zero_lines = re.search("\d\| 0\|", line)
if match_zero_lines:
count_not_covered = count_not_covered + 1
match_cannot_cover_lines = re.search("\d\| \|", line)
if match_cannot_cover_lines:
count_impossible_to_cover = count_impossible_to_cover + 1
if "/*target_line*/" in line:
target_line = line
if match_zero_lines:
target_line_covered = 0
fuzz_data["target_line_hit"] = 0
else:
fuzz_data["target_line_hit"] = line.split("|")[1].strip()
print_lines += ["The target line is: " + str(target_line)]
if target_line_covered:
print_lines += ["Target is covered"]
else:
print_lines += ["Target is not covered"]
coverage_ratio = (count - count_not_covered - count_impossible_to_cover) / (count - count_impossible_to_cover)
fuzz_data["coverage_ratio"] = coverage_ratio
print_lines += [
"The Line coverage is : "
+ str(count - count_not_covered - count_impossible_to_cover)
+ "/"
+ str(count - count_impossible_to_cover)
+ " = "
+ str(coverage_ratio)
]
return [target_line_covered, print_lines]
@staticmethod
def generate_coverage(print_lines, file, outpath, fuzz_data=None):
target_line_covered = 0
print_lines += [
"===============================COVERAGE ANALYSIS BEGINS================================="
]
# logger.info("PLEASE")
# command1 = file + " " + outpath + "/*"
# p = subprocess.Popen(command1, shell = True, stdout = subprocess.PIPE, stderr=subprocess.PIPE, cwd= outpath)
# p.wait()
command2 = "llvm-profdata merge -sparse default.profraw -o default.profdata"
p = subprocess.run(
shlex.split(command2),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=outpath,
)
output = p.stderr.decode("utf-8")
# logger.info(output)
if output.strip() != "":
print_lines += ["Could not find profdata!"]
print_lines += [output]
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return target_line_covered, print_lines
command3 = "llvm-cov show " + file + " -instr-profile=default.profdata"
p = subprocess.run(
shlex.split(command3),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=outpath,
)
output = p.stderr.decode("utf-8")
if output.strip() != "":
print_lines += ["Could not generate LLVM coverage"]
print_lines += [output]
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return target_line_covered, print_lines
coverage_output = p.stdout.decode("utf-8")
# logger.info(coverage_output)
print_lines += ["Coverage results generated!"]
target_line_covered, print_lines = Libfuzz.print_coverage(
file, print_lines, coverage_output, fuzz_data
)
print_lines += [
"===============================COVERAGE ANALYSIS ENDS================================="
]
return target_line_covered, print_lines
class Fuzzer:
test_library = ""
# Static analysis targets to compare
targets = []
# The time for which Fuzzer runs
TIMEOUT = timeout
# Max length of fuzz bytes
max_len = max_length_fuzz_bytes
# Allow multiprocessing for issues
set_parallel_execution = parallel_execution
# Lock for writing to stdout
lock = threading.Lock()
# False positives
FP = []
# True positives
TP = []
# Not reachable
NR = []
@staticmethod
def print_info(print_lines):
Fuzzer.lock.acquire()
for line in print_lines:
logger.info(line)
Fuzzer.lock.release()
@staticmethod
def print_crashes(print_lines, file, outpath, fuzzer):
print_lines += [
"===============================CRASH ANALYSIS BEGINS================================="
]
if fuzzer == 1:
print_lines, is_confirmed, count = Aflfuzz.check_crashes(
print_lines, file, outpath
)
else:
print_lines, is_confirmed, count = Libfuzz.check_crashes(
print_lines, file, outpath
)
print_lines += ["Found number of crashes: " + str(count)]
if is_confirmed:
print_lines += ["Crash aligns with detected site"]
else:
print_lines += ["Crash does not align with detected site"]
print_lines += [
"===============================CRASH ANALYSIS ENDS================================="
]
return is_confirmed, print_lines
@staticmethod
def identify_crash_line(print_lines, asan_output, file):
lines = asan_output.split("\n")
file_root = os.path.splitext(os.path.basename(file))[0]
issue = ""
crash_detected = 0
for line in lines:
m = re.findall("==ERROR: AddressSanitizer: ([\w-]+)", line)
if m:
issue = m[0]
crash_detected = 1
print_lines += ["Crash reason: " + issue]
if ("#" in line) and ((file_root + ".c") in line) and (crash_detected == 1):
print_lines += ["Crash at: " + line.split(" ")[-1]]
return [print_lines, line.split(" ")[-1]]
return [print_lines, ""]
@staticmethod
def confirm_crash_warning(print_lines, file, crash_line):
if crash_line.strip() == "":
return [print_lines, 0]
if ":" not in crash_line:
return [print_lines, 0]
crash_line_no = int(crash_line.split(":")[1])
crash_string = linecache.getline(file[:-3] + "c", crash_line_no)
print_lines += ["The crash happens at this point -> " + crash_string]
if "/*target_line*/" in crash_string.strip():
return [print_lines, 1]
return [print_lines, 0]
@staticmethod
def approximate_fuzz_byte_len(file):
source_file = file[:-3] + "c"
file = open(source_file, "r")
lines = file.readlines()
fixed_size = 0
dyn_size = 0
for line in lines:
if "// Buff size :" in line:
m = re.search("Buff size : ([\d]*) ", line)
if m:
fixed_size = int(m.group(1))
m = re.search("Dyn size : ([\d]*)", line)
if m:
dyn_size = int(m.group(1))
# Max 1000 bytes per fixed object and 10000 bytes for dynamic objects
approx_bytes = fixed_size * 1000 + dyn_size * 10000
Fuzzer.max_len = approx_bytes
# Please note that this could be wrong but is generally not so
# Smaller fuzz bytes help to faster reach vulnerability
logger.info("The fuzz bytes chosen for this issue is : " + str(approx_bytes))
@staticmethod
def fuzz_binary(print_lines, file, inpath, outpath, fuzzer):
print_lines += ["File name to be fuzzed.... : " + file]
Fuzzer.approximate_fuzz_byte_len(file)
if fuzzer == 1:
command, my_env, cwd = Aflfuzz.construct_fuzz_command(file, inpath, outpath)
else:
command, my_env, cwd = Libfuzz.construct_fuzz_command(file, inpath, outpath)
print_lines += [command]
# os.chdir(cwd)
# os.system(command)
subp = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd, shell=True, env=my_env
)
print_lines += ["Done fuzzing file: " + file]
return print_lines
@staticmethod
def flush_fuzz_dir(test_library):
dir = os.path.join("./fuzz", test_library)
if os.path.exists(dir):
shutil.rmtree(dir)
os.makedirs(dir)
@staticmethod
def prepare_directories(test_library, filename):
path = os.path.join("./fuzz/", test_library, filename)
os.mkdir(path)
inpath = os.path.abspath(os.path.join("./fuzz/", test_library, filename, "in"))
os.mkdir(inpath)
outpath = os.path.abspath(
os.path.join("./fuzz/", test_library, filename, "out")
)
os.mkdir(outpath)
with open(os.path.join(inpath, "seed"), "w") as f:
f.write("A" * 1000)
f.close()
return [inpath, outpath]
@staticmethod
def find_files(test_library, extension):
filenames = []
for filename in glob.glob(
os.path.join("./workspace", test_library, "test_files", "*" + extension)
):
filenames += [filename]
return filenames
@staticmethod
def build_report(sourcefiles, binaries):
logger.info(
"===============================FUZZ REPORT================================="
)
logger.info("Number of possible True positives: " + str(len(Fuzzer.TP)))
for issue in Fuzzer.TP:
logger.info(issue)
logger.info("\n")
logger.info("Number of possible False positives: " + str(len(Fuzzer.FP)))
for issue in Fuzzer.FP:
logger.info(issue)
logger.info("\n")
logger.info("Number of unreachable issues: " + str(len(Fuzzer.NR)))
for issue in Fuzzer.NR:
logger.info(issue)
logger.info("\n")
logger.info(
"Number of files that are not compiled :"
+ str(len(sourcefiles) - len(binaries))
)
for file in sourcefiles:
root_name = os.path.basename(file).split(".")[0]
found = 0
for bin in binaries:
if root_name in bin:
found = 1
break
if not (found):
logger.info(file)
logger.info(
"===============================FUZZ REPORT END================================="
)
@staticmethod
def process_issue(file, test_library, fuzzer, count_file, fuzz_data=None):
if fuzz_data is None:
fuzz_data = {}
file = os.path.abspath(file)
print_lines = ["Fuzzing issue: " + str(count_file)]
filename = os.path.basename(file)
inpath, outpath = Fuzzer.prepare_directories(test_library, filename)
print_lines = Fuzzer.fuzz_binary(print_lines, file, inpath, outpath, fuzzer)
if fuzzer == 1:
print_lines = Aflfuzz.generate_coverage(print_lines, file, outpath)
else:
target_line_covered, print_lines = Libfuzz.generate_coverage(
print_lines, file, outpath, fuzz_data
)
# Has to happen after generating coverage so that coverage information is not spoilt!!
target_crashes, print_lines = Fuzzer.print_crashes(
print_lines, file, outpath, fuzzer
)
word_root = filename.split(".")[-2]
warning_no = filename.split(".")[0]
for static_warning in Fuzzer.targets:
if static_warning.strip() == "":
continue
static_warning_no = static_warning.split(":")[1]
if (word_root in static_warning) and (warning_no == static_warning_no):
filename = static_warning
break
if target_crashes:
fuzz_data["type"] = "TP"
print_lines += ["This may be a True positive : " + filename]
Fuzzer.TP += [filename]
elif not (target_crashes) and target_line_covered:
fuzz_data["type"] = "FP"
print_lines += ["This is false positive : " + filename]
Fuzzer.FP += [filename]
else:
fuzz_data["type"] = "NR"
print_lines += ["Vulnerability cannot be reached :" + filename]
Fuzzer.NR += [filename]
return print_lines
@staticmethod
@timeout_decorator.timeout(hard_timeout)
def fuzz_binaries(test_library, fuzzer):
fuzz_data = {}
logger.info(
"===============================FUZZING STARTS================================="
)
files = Fuzzer.find_files(test_library, ".out")
sourcefiles = Fuzzer.find_files(test_library, ".c")
logger.info("Total number of source files : " + str(len(sourcefiles)))
logger.info("The number of binaries to be fuzzed: " + str(len(files)))
f = open(f"./info_lib/{test_library}/targets.txt", "r")
lines = f.readlines()
targets = []
for line in lines:
targets += [line]
Fuzzer.test_library = test_library
Fuzzer.targets = targets
Fuzzer.flush_fuzz_dir(test_library)
if Fuzzer.set_parallel_execution:
pool = Pool()
results = []
count_file = 0
for file in files:
if not Fuzzer.set_parallel_execution:
print_lines = Fuzzer.process_issue(
file, test_library, fuzzer, count_file, fuzz_data
)
Fuzzer.print_info(print_lines)
count_file = count_file + 1
else:
result = pool.apply_async(
Fuzzer.process_issue, [file, test_library, fuzzer, count_file]
)
results += [result]
count_file = count_file + 1
if Fuzzer.set_parallel_execution:
for result in results:
Fuzzer.print_info(result.get())
pool.close()
pool.join()
Fuzzer.build_report(sourcefiles, files)
logger.info(
"===============================FUZZING ENDS================================="
)
return fuzz_data