ClusterFuzz的bot源码(fuzz task)阅读

当时阅读时候的版本:ClusterFuzz v2.0.1

总览

首先运行bot的入口是python butler.py run_bot

1
2
3
4
5
6
7
8
args = parser.parse_args()
if not args.command:
parser.print_help()
return

_setup()
command = importlib.import_module('local.butler.%s' % args.command)
command.execute(args)

之后执行中的src/local/butler/run_bot.py中的execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def execute(args):
"""Run the bot."""
appengine_path = appengine.find_sdk_path()

_setup_bot_directory(args)
_setup_environment_and_configs(args, appengine_path)

try:
os.chdir(os.path.join(args.directory, 'clusterfuzz'))
proc = common.execute_async('python src/python/bot/startup/run_bot.py')

def _stop_handler(*_):
print('Bot has been stopped. Exit.')
proc.kill()

signal.signal(signal.SIGTERM, _stop_handler)
common.process_proc_output(proc)
proc.wait()
except KeyboardInterrupt:
_stop_handler()

之后就是执行src/python/bot/startup/run_bot.py,而这里面是有定义main函数的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if __name__ == '__main__':
if sys.version_info.major == 3:
# TODO(ochang): Remove check once all migrated to Python 3.
multiprocessing.set_start_method('spawn')

try:
with ndb_init.context():
main()
exit_code = 0
except Exception:
traceback.print_exc()
exit_code = 1

monitor.stop()

# Prevent python GIL deadlocks on shutdown. See https://crbug.com/744680.
os._exit(exit_code) # pylint: disable=protected-access

上面就是一些初始化就执行main函数,我们来看main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def main():
"""Prepare the configuration options and start requesting tasks."""
logs.configure('run_bot')

root_directory = environment.get_value('ROOT_DIR')
if not root_directory:
print('Please set ROOT_DIR environment variable to the root of the source '
'checkout before running. Exiting.')
print('For an example, check init.bash in the local directory.')
return

dates.initialize_timezone_from_environment()
environment.set_bot_environment()
monitor.initialize()

if not profiler.start_if_needed('python_profiler_bot'):
sys.exit(-1)

fuzzers_init.run()

if environment.is_trusted_host(ensure_connected=False):
from bot.untrusted_runner import host
host.init()

if environment.is_untrusted_worker():
# Track revision since we won't go into the task_loop.
update_task.track_revision()

from bot.untrusted_runner import untrusted as untrusted_worker
untrusted_worker.start_server()
assert False, 'Unreachable code'

while True:
# task_loop should be an infinite loop,
# unless we run into an exception.
error_stacktrace, clean_exit, task_payload = task_loop() ### 这里是核心,task_loop

# Print the error trace to the console.
if not clean_exit:
print('Exception occurred while running "%s".' % task_payload)
print('-' * 80)
print(error_stacktrace)
print('-' * 80)

should_terminate = (
clean_exit or errors.error_in_list(error_stacktrace,
errors.BOT_ERROR_TERMINATION_LIST))
if should_terminate:
return

logs.log_error(
'Task exited with exception (payload="%s").' % task_payload,
error_stacktrace=error_stacktrace)

should_hang = errors.error_in_list(error_stacktrace,
errors.BOT_ERROR_HANG_LIST)
if should_hang:
logs.log('Start hanging forever.')
while True:
# Sleep to avoid consuming 100% of CPU.
time.sleep(60)

# See if our run timed out, if yes bail out.
if data_handler.bot_run_timed_out():
return

# Clear the current exception.
utils.exc_clear()

看到task_loop()函数,task = tasks.get_task()获取任务,commands.process_command(task)执行命令并删除任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def task_loop():
"""Executes tasks indefinitely."""
clean_exit = False
while True:
exception_occurred = False
task = None
# This caches the current environment on first run. Don't move this.
environment.reset_environment()
try:
# Run regular updates.
update_task.run()
update_task.track_revision()

task = tasks.get_task() ### 获取从任务
if not task:
continue

with _Monitor(task):
with task.lease():
# Execute the command and delete the task.
commands.process_command(task) # 执行命令并删除任务
except SystemExit as e:
exception_occurred = True
clean_exit = (e.code == 0)
if not clean_exit and not isinstance(e, untrusted.HostException):
logs.log_error('SystemExit occurred while working on task.')
except commands.AlreadyRunningError:
exception_occurred = False
except Exception:
logs.log_error('Error occurred while working on task.')
exception_occurred = True

if exception_occurred:
# Prevent looping too quickly. See: crbug.com/644830
failure_wait_interval = environment.get_value('FAIL_WAIT')
time.sleep(utils.random_number(1, failure_wait_interval))
break

task_payload = task.payload() if task else None
return traceback.format_exc(), clean_exit, task_payload

获取任务

我们先看get_task函数,可以看这里除了fuzz的任务,还有其他任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_task():
"""Get a task."""
task = get_command_override()
if task:
return task

# TODO(unassigned): Remove this hack.
if environment.get_value('ML'):
return get_regular_task(queue=ML_JOBS_TASKQUEUE)

allow_all_tasks = not environment.get_value('PREEMPTIBLE')
if allow_all_tasks:
# Check the high-end jobs queue for bots with multiplier greater than 1.
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
if thread_multiplier and thread_multiplier > 1:
task = get_high_end_task()
if task:
return task

task = get_regular_task()
if task:
return task

task = get_fuzz_task()
if not task:
logs.log_error('Failed to get any fuzzing tasks. This should not happen.')
time.sleep(TASK_EXCEPTION_WAIT_INTERVAL)

return task

我们还是比较关心fuzz,看get_fuzz_task,这里是获取argument和job

1
2
3
4
5
6
7
def get_fuzz_task():
"""Try to get a fuzz task."""
argument, job = fuzzer_selection.get_fuzz_task_payload()
if not argument:
return None

return Task('fuzz', argument, job)

继续跟fuzzer_selection.get_fuzz_task_payload(),这里是去谷歌云那边查询任务了,最后随机选取任务返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_fuzz_task_payload(platform=None):
"""Select a fuzzer that can run on this platform."""
if not platform:
queue_override = environment.get_value('QUEUE_OVERRIDE')
platform = queue_override if queue_override else environment.platform()

query = data_types.FuzzerJob.query()
query = query.filter(data_types.FuzzerJob.platform == platform)

mappings = list(ndb_utils.get_all_from_query(query))
if not mappings:
return None, None

selection = utils.random_weighted_choice(
mappings, weight_attribute='actual_weight') # 最后随机选取任务返回
return selection.fuzzer, selection.job

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# pylint: disable=too-many-nested-blocks
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
@set_task_payload
def process_command(task):
"""Figures out what to do with the given task and executes the command."""
logs.log("Executing command '%s'" % task.payload())
if not task.payload().strip():
logs.log_error('Empty task received.')
return

# Parse task payload.
task_name = task.command
task_argument = task.argument
job_name = task.job
......
......
......
# Match the cpu architecture with the ones required in the job definition.
# If they don't match, then bail out and recreate task.
if not is_supported_cpu_arch_for_job():
logs.log(
'Unsupported cpu architecture specified in job definition, exiting.')
tasks.add_task(task_name, task_argument, job_name)
return

# Initial cleanup.
cleanup_task_state()

start_web_server_if_needed()

try:
run_command(task_name, task_argument, job_name) # 运行命令
finally:
# Final clean up.
cleanup_task_state()

看run_command,实际是task_module.execute_task(task_argument, job_name)执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def run_command(task_name, task_argument, job_name):
"""Run the command."""
if task_name not in COMMAND_MAP:
logs.log_error("Unknown command '%s'" % task_name)
return

task_module = COMMAND_MAP[task_name]

# If applicable, ensure this is the only instance of the task running.
task_state_name = ' '.join([task_name, task_argument, job_name])
if should_update_task_status(task_name):
if not data_handler.update_task_status(task_state_name,
data_types.TaskState.STARTED):
logs.log('Another instance of "{}" already '
'running, exiting.'.format(task_state_name))
raise AlreadyRunningError

try:
task_module.execute_task(task_argument, job_name) # 执行任务
except errors.InvalidTestcaseError:
# It is difficult to try to handle the case where a test case is deleted
# during processing. Rather than trying to catch by checking every point
# where a test case is reloaded from the datastore, just abort the task.
logs.log_error('Test case %s no longer exists.' % task_argument)
except BaseException:
# On any other exceptions, update state to reflect error and re-raise.
if should_update_task_status(task_name):
data_handler.update_task_status(task_state_name,
data_types.TaskState.ERROR)

raise

# Task completed successfully.
if should_update_task_status(task_name):
data_handler.update_task_status(task_state_name,
data_types.TaskState.FINISHED)

其实一开始判断task_name是否在COMMAND_MAP中,可以看到除了fuzz任务外,还有很多任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
COMMAND_MAP = {
'analyze': analyze_task,
'blame': blame_task,
'corpus_pruning': corpus_pruning_task,
'fuzz': fuzz_task,
'impact': impact_task,
'minimize': minimize_task,
'ml_train': ml_train_task,
'progression': progression_task,
'regression': regression_task,
'symbolize': symbolize_task,
'unpack': unpack_task,
'upload_reports': upload_reports_task,
'variant': variant_task,
}

继续跟task_module.execute_task,我们关注fuzz的吧,就是fuzz_task.execute_task

1
2
3
4
5
def execute_task(fuzzer_name, job_type):
"""Runs the given fuzzer for one round."""
test_timeout = environment.get_value('TEST_TIMEOUT')
session = FuzzingSession(fuzzer_name, job_type, test_timeout)
session.run()

先获取超时,之后初始化FuzzingSession,初始化代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

class FuzzingSession(object):
"""Class for orchestrating fuzzing sessions."""

def __init__(self, fuzzer_name, job_type, test_timeout):
self.fuzzer_name = fuzzer_name
self.job_type = job_type

# Set up randomly selected fuzzing parameters.
self.redzone = pick_redzone()
self.disable_ubsan = pick_ubsan_disabled(job_type)
self.timeout_multiplier = pick_timeout_multiplier()
self.window_argument = pick_window_argument()
self.test_timeout = set_test_timeout(test_timeout, self.timeout_multiplier)

# Set up during run().
self.testcase_directory = None
self.data_directory = None

# Fuzzing engine specific state.
self.fuzz_target = None
self.gcs_corpus = None

run的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def run(self):
"""Run the fuzzing session."""
failure_wait_interval = environment.get_value('FAIL_WAIT')

# Update LSAN local blacklist with global blacklist.
is_lsan_enabled = environment.get_value('LSAN')
if is_lsan_enabled:
leak_blacklist.copy_global_to_local_blacklist()

# For some binaries, we specify trials, which are sets of flags that we only
# apply some of the time. Adjust APP_ARGS for them if needed.
trials.setup_additional_args_for_app()

# Ensure that that the fuzzer still exists.
logs.log('Setting up fuzzer and data bundles.')
fuzzer = data_types.Fuzzer.query(
data_types.Fuzzer.name == self.fuzzer_name).get()
if not fuzzer or not setup.update_fuzzer_and_data_bundles(self.fuzzer_name):
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.FUZZER_SETUP_FAILED)
logs.log_error('Unable to setup fuzzer %s.' % self.fuzzer_name)

# Artifical sleep to slow down continuous failed fuzzer runs if the bot is
# using command override for task execution.
time.sleep(failure_wait_interval)
return

self.testcase_directory = environment.get_value('FUZZ_INPUTS')

# Set up a custom or regular build based on revision. By default, fuzzing
# is done on trunk build (using revision=None). Otherwise, a job definition
# can provide a revision to use via |APP_REVISION|.
target_weights = fuzzer_selection.get_fuzz_target_weights()

build_setup_result = build_manager.setup_build(
environment.get_value('APP_REVISION'), target_weights=target_weights)
# Check if we have an application path. If not, our build failed
# to setup correctly.
if not build_setup_result or not build_manager.check_app_path():
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.BUILD_SETUP_FAILED)
return

dataflow_bucket_path = environment.get_value('DATAFLOW_BUILD_BUCKET_PATH')
if dataflow_bucket_path:
# Some fuzzing jobs may use auxiliary builds, such as DFSan instrumented
# builds accompanying libFuzzer builds to enable DFT-based fuzzing.
if not build_manager.setup_trunk_build(
[dataflow_bucket_path], build_prefix='DATAFLOW'):
logs.log_error('Failed to set up dataflow build.')

# Save fuzz targets count to aid with CPU weighting.
self._save_fuzz_targets_count()

# Check if we have a bad build, i.e. one that crashes on startup.
# If yes, bail out.
logs.log('Checking for bad build.')
crash_revision = environment.get_value('APP_REVISION')
is_bad_build = testcase_manager.check_for_bad_build(self.job_type,
crash_revision)
_track_build_run_result(self.job_type, crash_revision, is_bad_build)
if is_bad_build:
return

# Data bundle directories can also have testcases which are kept in-place
# because of dependencies.
self.data_directory = setup.get_data_bundle_directory(self.fuzzer_name)
if not self.data_directory:
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.DATA_BUNDLE_SETUP_FAILED)
logs.log_error(
'Unable to setup data bundle %s.' % fuzzer.data_bundle_name)
return

engine_impl = engine.get(fuzzer.name)
if engine_impl:
crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)

# Not applicable to engine fuzzers.
testcase_file_paths = []
testcases_metadata = {}
else:
fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))

if crashes is None:
# Error occurred in generate_blackbox_testcases.
# TODO(ochang): Pipe this error a little better.
return

logs.log('Finished processing test cases.')

platform = environment.platform()
platform_id = environment.get_platform_id()

# For Android, bring back device to a good state before analyzing crashes.
if platform == 'ANDROID' and crashes:
# Remove this variable so that application is fully shutdown before every
# re-run of testcase. This is critical for reproducibility.
environment.remove_key('CHILD_PROCESS_TERMINATION_PATTERN')

# TODO(unassigned): Need to find a way to this efficiently before every
# testcase is analyzed.
android.device.initialize_device()

logs.log('Raw crash count: ' + str(len(crashes)))

project_name = data_handler.get_project_name(self.job_type)

# Process and save crashes to datastore.
bot_name = environment.get_value('BOT_NAME')
new_crash_count, known_crash_count, processed_groups = process_crashes(
crashes=crashes,
context=Context(
project_name=project_name,
bot_name=bot_name,
job_type=self.job_type,
fuzz_target=self.fuzz_target,
redzone=self.redzone,
disable_ubsan=self.disable_ubsan,
platform_id=platform_id,
crash_revision=crash_revision,
fuzzer_name=self.fuzzer_name,
window_argument=self.window_argument,
fuzzer_metadata=fuzzer_metadata,
testcases_metadata=testcases_metadata,
timeout_multiplier=self.timeout_multiplier,
test_timeout=self.test_timeout,
thread_wait_timeout=THREAD_WAIT_TIMEOUT,
data_directory=self.data_directory))

read_and_upload_testcase_run_stats(
self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, testcase_file_paths)
upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, time.time(),
new_crash_count, known_crash_count,
len(testcase_file_paths), processed_groups)

# Delete the fuzzed testcases. This is explicitly needed since
# some testcases might reside on NFS and would otherwise be
# left forever.
for testcase_file_path in testcase_file_paths:
shell.remove_file(testcase_file_path)

# Explicit cleanup for large vars.
del testcase_file_paths
del testcases_metadata
utils.python_gc()

实在太多了,前面做了一些初始化,之后是选择引擎进行fuzz——self.do_engine_fuzzing(engine_impl),没有的就是黑盒测试self.do_blackbox_fuzzing

1
2
3
4
5
6
7
8
9
10
11
engine_impl = engine.get(fuzzer.name)
if engine_impl:
crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)

# Not applicable to engine fuzzers.
testcase_file_paths = []
testcases_metadata = {}
else:
fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))

接下来最后就是处理crashes,上传crash,最后更新任务的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Process and save crashes to datastore.
bot_name = environment.get_value('BOT_NAME')
new_crash_count, known_crash_count, processed_groups = process_crashes(
crashes=crashes,
context=Context(
project_name=project_name,
bot_name=bot_name,
job_type=self.job_type,
fuzz_target=self.fuzz_target,
redzone=self.redzone,
disable_ubsan=self.disable_ubsan,
platform_id=platform_id,
crash_revision=crash_revision,
fuzzer_name=self.fuzzer_name,
window_argument=self.window_argument,
fuzzer_metadata=fuzzer_metadata,
testcases_metadata=testcases_metadata,
timeout_multiplier=self.timeout_multiplier,
test_timeout=self.test_timeout,
thread_wait_timeout=THREAD_WAIT_TIMEOUT,
data_directory=self.data_directory))

read_and_upload_testcase_run_stats(
self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, testcase_file_paths)
upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, time.time(),
new_crash_count, known_crash_count,
len(testcase_file_paths), processed_groups)

do_engine_fuzzing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def do_engine_fuzzing(self, engine_impl):
"""Run fuzzing engine."""
# Record fuzz target.
fuzz_target_name = environment.get_value('FUZZ_TARGET')
self.fuzz_target = record_fuzz_target(engine_impl.name, fuzz_target_name,
self.job_type)
environment.set_value('FUZZER_NAME',
self.fuzz_target.fully_qualified_name())

# Synchronize corpus files with GCS
sync_corpus_directory = builtin.get_corpus_directory(
self.data_directory, self.fuzz_target.project_qualified_name())
self.sync_corpus(sync_corpus_directory)

# Reset memory tool options.
environment.reset_current_memory_tool_options(
redzone_size=self.redzone, disable_ubsan=self.disable_ubsan)

revision = environment.get_value('APP_REVISION')
crashes = []
fuzzer_metadata = {}
return_code = 1 # Vanilla return-code for engine crashes.

# Do the actual fuzzing.
for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
logs.log('Fuzzing round {}.'.format(fuzzing_round))
result, current_fuzzer_metadata = run_engine_fuzzer(
engine_impl, self.fuzz_target.binary, sync_corpus_directory,
self.testcase_directory)
fuzzer_metadata.update(current_fuzzer_metadata)

# Prepare stats.
testcase_run = engine_common.get_testcase_run(result.stats,
result.command)

# Upload logs, testcases (if there are crashes), and stats.
# Use a consistent log time to allow correlating between logs, uploaded
# testcases, and stats.
log_time = datetime.datetime.utcfromtimestamp(
float(testcase_run.timestamp))
crash_result = CrashResult(return_code, result.time_executed, result.logs)
log = testcase_manager.prepare_log_for_upload(
crash_result.get_stacktrace(), return_code)
testcase_manager.upload_log(log, log_time)

for crash in result.crashes:
testcase_manager.upload_testcase(crash.input_path, log_time)

add_additional_testcase_run_data(testcase_run,
self.fuzz_target.fully_qualified_name(),
self.job_type, revision)
upload_testcase_run_stats(testcase_run)
if result.crashes:
crashes.extend([
Crash.from_engine_crash(crash) for crash in result.crashes if crash
])

logs.log('All fuzzing rounds complete.')
self.sync_new_corpus_files()

return crashes, fuzzer_metadata

实际fuzz是run_engine_fuzzer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def run_engine_fuzzer(engine_impl, target_name, sync_corpus_directory,
testcase_directory):
"""Run engine for fuzzing."""
if environment.is_trusted_host():
from bot.untrusted_runner import tasks_host # 这还搞了个untrusted的runner,太复杂了
return tasks_host.engine_fuzz(engine_impl, target_name,
sync_corpus_directory, testcase_directory)

build_dir = environment.get_value('BUILD_DIR')
target_path = engine_common.find_fuzzer_path(build_dir, target_name)
options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir)

fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
result = engine_impl.fuzz(target_path, options, testcase_directory,
fuzz_test_timeout) # 调用对应引擎的fuzz函数

logs.log('Used strategies.', strategies=options.strategies)
for strategy, value in six.iteritems(options.strategies):
result.stats['strategy_' + strategy] = value

# Format logs with header and strategy information.
log_header = engine_common.get_log_header(result.command,
environment.get_value('BOT_NAME'),
result.time_executed)

formatted_strategies = engine_common.format_fuzzing_strategies(
options.strategies)

result.logs = log_header + '\n' + result.logs + '\n' + formatted_strategies

fuzzer_metadata = {
'fuzzer_binary_name': target_name,
}

fuzzer_metadata.update(engine_common.get_all_issue_metadata(target_path))
_add_issue_metadata_from_environment(fuzzer_metadata)
return result, fuzzer_metadata

引擎类

上面提到的运行fuzzer,是通过engine.get是获取引擎类,get函数如下

1
2
3
4
5
6
7
def get(name):
"""Get an implemntation of a fuzzing engine, or None if one does not exist."""
engine_class = _ENGINES.get(name)
if engine_class:
return engine_class()

return None

而之前得先注册

1
2
3
4
5
6
def register(name, engine_class):
"""Register a fuzzing engine."""
if name in _ENGINES:
raise ValueError('Engine {name} is already registered'.format(name=name))

_ENGINES[name] = engine_class

而注册这个在src/python/bot/startup/run_bot.py的时候注册的

1
2
3
4
5
6
7
8
9
......
from bot.fuzzers import init as fuzzers_init
......
def main():

if not profiler.start_if_needed('python_profiler_bot'):
sys.exit(-1)

fuzzers_init.run() # 在这里注册

跟过去可以看注册了libFuzzer,honggfuzz和syzkaller,有疑问的是咋没有afl

1
2
3
4
5
def run():
"""Initialise builtin fuzzing engines."""
engine.register('libFuzzer', libFuzzer_engine.LibFuzzerEngine)
engine.register('honggfuzz', honggfuzz_engine.HonggfuzzEngine)
engine.register('syzkaller', syzkaller_engine.SyzkallerEngine)

do_blackbox_fuzzing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def do_blackbox_fuzzing(self, fuzzer, fuzzer_directory, job_type):
"""Run blackbox fuzzing. Currently also used for engine fuzzing."""
# Set the thread timeout values.
# TODO(ochang): Remove this hack once engine fuzzing refactor is compelte.
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
if fuzz_test_timeout:
test_timeout = set_test_timeout(fuzz_test_timeout,
self.timeout_multiplier)
else:
test_timeout = self.test_timeout

thread_timeout = test_timeout

# Determine number of testcases to process.
testcase_count = environment.get_value('MAX_TESTCASES')

# For timeout multipler greater than 1, we need to decrease testcase count
# to prevent exceeding task lease time.
if self.timeout_multiplier > 1:
testcase_count /= self.timeout_multiplier

# Run the fuzzer to generate testcases. If error occurred while trying
# to run the fuzzer, bail out.
(error_occurred, testcase_file_paths, sync_corpus_directory,
fuzzer_metadata) = self.generate_blackbox_testcases(
fuzzer, fuzzer_directory, testcase_count)
......
......

在self.generate_blackbox_testcases里面是会实际启动fuzzer的,注释说的是Run the blackbox fuzzer and generate testcases.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def generate_blackbox_testcases(self, fuzzer, fuzzer_directory,
testcase_count):
"""Run the blackbox fuzzer and generate testcases."""
# Helper variables.
error_occurred = False
fuzzer_revision = fuzzer.revision
fuzzer_name = fuzzer.name
sync_corpus_directory = None

# Clear existing testcases (only if past task failed).
testcase_directories = get_testcase_directories(self.testcase_directory,
self.data_directory)
testcase_manager.remove_testcases_from_directories(testcase_directories)

# Set an environment variable for fuzzer name.
# TODO(ochang): Investigate removing this. Only users appear to be chromebot
# fuzzer and fuzzer_logs, both of which can be removed.
environment.set_value('FUZZER_NAME', fuzzer_name)

# Set minimum redzone size, do not detect leaks and zero out the
# quarantine size before running the fuzzer.
environment.reset_current_memory_tool_options(
redzone_size=16, leaks=False, quarantine_size_mb=0)

if fuzzer.builtin:
fuzzer_command = 'builtin'
builtin_fuzzer = builtin_fuzzers.get(fuzzer.name) #这里面有个afl和libfuzz

builtin_result = builtin_fuzzer.run(
self.data_directory, self.testcase_directory, testcase_count)

fuzzer_output = builtin_result.output
sync_corpus_directory = builtin_result.corpus_directory

# Return code is always 0 as builtin fuzzers log errors directly.
fuzzer_return_code = 0
else:
# Make sure we have a file to execute for the fuzzer.
......
......
# 获取可执行文件fuzzer路径,应该是libfuzzer那样,二进制文件就是fuzzer
# Get the fuzzer executable and chdir to its base directory. This helps to
# prevent referencing every file using __file__.
fuzzer_executable = os.path.join(fuzzer_directory, fuzzer.executable_path)
fuzzer_executable_directory = os.path.dirname(fuzzer_executable)
......
......
# Build the fuzzer command execution string.
command = shell.get_execute_command(fuzzer_executable)
......
command_format = ('%s --input_dir%s%s --output_dir%s%s --no_of_files%s%d')
fuzzer_command = str(
command_format % (command, argument_seperator, self.data_directory,
argument_seperator, self.testcase_directory,
argument_seperator, testcase_count))
fuzzer_timeout = environment.get_value('FUZZER_TIMEOUT')

# Run the fuzzer. 启动fuzzer
logs.log('Running fuzzer - %s.' % fuzzer_command)
fuzzer_return_code, fuzzer_duration, fuzzer_output = (
process_handler.run_process(
fuzzer_command,
current_working_directory=fuzzer_executable_directory,
timeout=fuzzer_timeout,
testcase_run=False,
ignore_children=False))

下面是builtin_fuzzer = builtin_fuzzers.get(fuzzer.name)所能获取到的

1
2
3
4
5
6
7
8
9
10
11
12
BUILTIN_FUZZERS = {
'afl': afl.Afl(),
'libFuzzer': libFuzzer.LibFuzzer(),
}


def get(fuzzer_name):
"""Get the builtin fuzzer with the given name, or None."""
if fuzzer_name not in BUILTIN_FUZZERS:
return None

return BUILTIN_FUZZERS[fuzzer_name]

下面的do_blackbox_fuzzing函数的后半部分,是处理testcases的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
............
............
# Start processing the testcases.
while test_number < len(testcase_file_paths):
thread_index = 0
threads = []

temp_queue = process_handler.get_queue()
if not temp_queue:
process_handler.terminate_stale_application_instances()
logs.log_error('Unable to create temporary crash queue.')
break

while thread_index < max_threads and test_number < len(
testcase_file_paths):
testcase_file_path = testcase_file_paths[test_number]
gestures = testcases_metadata[testcase_file_path]['gestures']

env_copy = environment.copy()
thread = process_handler.get_process()(
target=testcase_manager.run_testcase_and_return_result_in_queue,
args=(temp_queue, thread_index, testcase_file_path, gestures,
env_copy, True))

try:
thread.start()
except:
process_handler.terminate_stale_application_instances()
thread_error_occurred = True
logs.log_error('Unable to start new thread.')
break

threads.append(thread)
thread_index += 1
test_number += 1

if test_number % testcases_before_stale_process_cleanup == 0:
needs_stale_process_cleanup = True

time.sleep(thread_delay)
............
............

上面调用了run_testcase_and_return_result_in_queue,它是运行一个testcases,并且上传crash信息了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def run_testcase_and_return_result_in_queue(crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=False):
"""Run a single testcase and return crash results in the crash queue."""
# Since this is running in its own process, initialize the log handler again.
# This is needed for Windows where instances are not shared across child
# processes. See:
# https://stackoverflow.com/questions/34724643/python-logging-with-multiprocessing-root-logger-different-in-windows
logs.configure('run_testcase', {
'testcase_path': file_path,
})

# Also reinitialize NDB context for the same reason as above.
with ndb_init.context():
_do_run_testcase_and_return_result_in_queue(
crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=upload_output)

里面又调用了_do_run_testcase_and_return_result_in_queue,里面就是上传CrashResult了,根据这,实际的fuzz代码应该就是self.generate_blackbox_testcases

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def _do_run_testcase_and_return_result_in_queue(crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=False):
"""Run a single testcase and return crash results in the crash queue."""
try:
# Run testcase and check whether a crash occurred or not.
return_code, crash_time, output = run_testcase(thread_index, file_path,
gestures, env_copy)

# Pull testcase directory to host to get any stats files.
if environment.is_trusted_host():
from bot.untrusted_runner import file_host
file_host.pull_testcases_from_worker()

# Analyze the crash.
crash_output = _get_crash_output(output)
crash_result = CrashResult(return_code, crash_time, crash_output)

# To provide consistency between stats and logs, we use timestamp taken
# from stats when uploading logs and testcase.
if upload_output:
log_time = _get_testcase_time(file_path)

if crash_result.is_crash():
# Initialize resource list with the testcase path.
resource_list = [file_path]
resource_list += get_resource_paths(crash_output)

# Store the crash stack file in the crash stacktrace directory
# with filename as the hash of the testcase path.
crash_stacks_directory = environment.get_value('CRASH_STACKTRACES_DIR')
stack_file_path = os.path.join(crash_stacks_directory,
utils.string_hash(file_path))
utils.write_data_to_file(crash_output, stack_file_path)

# Put crash/no-crash results in the crash queue.
crash_queue.put(
Crash(
file_path=file_path,
crash_time=crash_time,
return_code=return_code,
resource_list=resource_list,
gestures=gestures,
stack_file_path=stack_file_path))

# Don't upload uninteresting testcases (no crash) or if there is no log to
# correlate it with (not upload_output).
if upload_output:
upload_testcase(file_path, log_time)

if upload_output:
# Include full output for uploaded logs (crash output, merge output, etc).
crash_result_full = CrashResult(return_code, crash_time, output)
log = prepare_log_for_upload(crash_result_full.get_stacktrace(),
return_code)
upload_log(log, log_time)
except Exception:
logs.log_error('Exception occurred while running '
'run_testcase_and_return_result_in_queue.')

如何获取要运行的target

我们上传zip包,但是里面可能有多个target,有些可能只是fuzzer所需文件,那怎么找到要运行的target呢

我们跟踪一下

是fuzz task,就进来src/python/bot/tasks/fuzz_task.py执行execute_task

1
2
3
4
5
def execute_task(fuzzer_name, job_type):
"""Runs the given fuzzer for one round."""
test_timeout = environment.get_value('TEST_TIMEOUT')
session = FuzzingSession(fuzzer_name, job_type, test_timeout)
session.run()

上面调用FuzzingSession类里面的run函数,在run函数里面https://github.com/google/clusterfuzz/blob/9c2065a7f7b7802936b1133733402adc65ac0c4c/src/python/bot/tasks/fuzz_task.py#L1843这里调用了build_manager.setup_build

1
2
build_setup_result = build_manager.setup_build(
environment.get_value('APP_REVISION'), target_weights=target_weights)

跟进build_manager.setup_build

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def setup_build(revision=0, target_weights=None):
"""Set up a custom or regular build based on revision."""
# For custom binaries we always use the latest version. Revision is ignored.
custom_binary = environment.get_value('CUSTOM_BINARY')
if custom_binary:
return setup_custom_binary(target_weights=target_weights)

# In this case, we assume the build is already installed on the system.
system_binary = environment.get_value('SYSTEM_BINARY_DIR')
if system_binary:
return setup_system_binary()

fuzz_target_build_bucket_path = environment.get_value(
'FUZZ_TARGET_BUILD_BUCKET_PATH')
if fuzz_target_build_bucket_path:
# Split fuzz target build.
return _setup_split_targets_build(
fuzz_target_build_bucket_path, target_weights, revision=revision)

if revision:
# Setup regular build with revision.
return setup_regular_build(revision, target_weights=target_weights)

# If no revision is provided, we default to a trunk build.
bucket_paths = []
for env_var in DEFAULT_BUILD_BUCKET_PATH_ENV_VARS:
bucket_path = environment.get_value(env_var)
if bucket_path:
bucket_paths.append(bucket_path)

return setup_trunk_build(bucket_paths, target_weights=target_weights)

setup_build 中的setup_custom_binary

首先是setup_custom_binary,里面调用了CustomBuild的setup

1
2
3
4
5
6
7
8
9
10
11
12
13
build = CustomBuild(
base_build_dir,
job.custom_binary_key,
job.custom_binary_filename,
job.custom_binary_revision,
target_weights=target_weights)

# Revert back the actual job name.
if share_build_job_type:
environment.set_value('JOB_NAME', old_job_name)

if build.setup():
return build

看setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def setup(self):
"""Set up the custom binary for a particular job."""
self._pre_setup()

# Track the key for the custom binary so we can create a download link
# later.
environment.set_value('BUILD_KEY', self.custom_binary_key)

logs.log('Retrieving custom binary build r%d.' % self.revision)

revision_file = os.path.join(self.build_dir, REVISION_FILE_NAME)
build_update = revisions.needs_update(revision_file, self.revision)

if build_update:
if not self._unpack_custom_build(): # 解压上传的压缩包,里面调用了archive.unpack
return False

logs.log('Retrieved custom binary build r%d.' % self.revision)
else:
logs.log('Build already exists.')

_set_random_fuzz_target_for_fuzzing_if_needed(
self._get_fuzz_targets_from_dir(self.build_dir), self.target_weights)

self._setup_application_path(build_update=build_update)
self._post_setup_success(update_revision=build_update)
return True

上面的_set_random_fuzz_target_for_fuzzing_if_needed就选择压缩包中的二进制文件了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _set_random_fuzz_target_for_fuzzing_if_needed(fuzz_targets, target_weights):
"""Sets a random fuzz target for fuzzing."""
fuzz_target = environment.get_value('FUZZ_TARGET')
if fuzz_target:
logs.log('Use previously picked fuzz target %s for fuzzing.' % fuzz_target)
return fuzz_target

if not environment.is_engine_fuzzer_job():
return None

fuzz_targets = list(fuzz_targets)
if not fuzz_targets:
logs.log_error('No fuzz targets found. Unable to pick random one.')
return None

environment.set_value('FUZZ_TARGET_COUNT', len(fuzz_targets))

fuzz_target = fuzzer_selection.select_fuzz_target(fuzz_targets,
target_weights)
environment.set_value('FUZZ_TARGET', fuzz_target)
logs.log('Picked fuzz target %s for fuzzing.' % fuzz_target)

return fuzz_target

但我们初步选择的是_get_fuzz_targets_from_dir,之后根据这个结果,在通过target_weights从里面选,所以第一部选还是在_get_fuzz_targets_from_dir,确保他是一个fuzzer

CustomBuild没有这个函数,那实际就是父类的_get_fuzz_targets_from_dir,可以看到是调用get_fuzz_targets

1
2
3
4
5
6
7
def _get_fuzz_targets_from_dir(self, build_dir):
"""Get iterator of fuzz targets from build dir."""
# Import here as this path is not available in App Engine context.
from bot.fuzzers import utils as fuzzer_utils

for path in fuzzer_utils.get_fuzz_targets(build_dir):
yield os.path.splitext(os.path.basename(path))[0]

而这个fuzzer_utils.get_fuzz_targetsfrom bot.fuzzers import utils as fuzzer_utils,那就是src/python/bot/fuzzers/utils.py里面的

1
2
3
4
5
6
def get_fuzz_targets(path):
"""Get list of fuzz targets paths."""
if environment.is_trusted_host():
from bot.untrusted_runner import file_host
return file_host.get_fuzz_targets(path)
return get_fuzz_targets_local(path)

继续跟进get_fuzz_targets_local

1
2
3
4
5
6
7
8
9
10
11
def get_fuzz_targets_local(path):
"""Get list of fuzz targets paths (local)."""
fuzz_target_paths = []

for root, _, files in shell.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
if is_fuzz_target_local(file_path):
fuzz_target_paths.append(file_path)

return fuzz_target_paths

就是is_fuzz_target_local

1、首先得名字得满足正则VALID_TARGET_NAME
2、后缀名是ALLOWED_FUZZ_TARGET_EXTENSIONS,即无后缀,exe或者par
3、最后就是文件中得有FUZZ_TARGET_SEARCH_BYTES,也即LLVMFuzzerTestOneInput这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
ALLOWED_FUZZ_TARGET_EXTENSIONS = ['', '.exe', '.par']
FUZZ_TARGET_SEARCH_BYTES = b'LLVMFuzzerTestOneInput'
VALID_TARGET_NAME = re.compile(r'^[a-zA-Z0-9_-]+$')

def is_fuzz_target_local(file_path, file_handle=None):
#TODO(hzawawy): handle syzkaller case.
"""Returns whether |file_path| is a fuzz target binary (local path)."""
filename, file_extension = os.path.splitext(os.path.basename(file_path))
if not VALID_TARGET_NAME.match(filename):
# Check fuzz target has a valid name (without any special chars).
return False

if file_extension not in ALLOWED_FUZZ_TARGET_EXTENSIONS:
# Ignore files with disallowed extensions (to prevent opening e.g. .zips).
return False

if not file_handle and not os.path.exists(file_path):
# Ignore non-existant files for cases when we don't have a file handle.
return False

if filename.endswith('_fuzzer'):
return True

# TODO(aarya): Remove this optimization if it does not show up significant
# savings in profiling results.
fuzz_target_name_regex = environment.get_value('FUZZER_NAME_REGEX')
if fuzz_target_name_regex:
return bool(re.match(fuzz_target_name_regex, filename))

if os.path.exists(file_path) and not stat.S_ISREG(os.stat(file_path).st_mode):
# Don't read special files (eg: /dev/urandom).
logs.log_warn('Tried to read from non-regular file: %s.' % file_path)
return False

# Use already provided file handle or open the file.
local_file_handle = file_handle or open(file_path, 'rb')

# TODO(metzman): Bound this call so we don't read forever if something went
# wrong.
result = utils.search_bytes_in_file(FUZZ_TARGET_SEARCH_BYTES,
local_file_handle)

if not file_handle:
# If this local file handle is owned by our function, close it now.
# Otherwise, it is caller's responsibility.
local_file_handle.close()

return result
打赏专区