pastis样本同步代码阅读记录

简单看下https://github.com/quarkslab/pastis/的样本同步相关的代码

fuzz端

fuzz端就两个功能,发送新增的样本或者crash,以及接收新样本

发送新增样本或者crash

以afl++为例: https://github.com/quarkslab/pastis/blob/56f71b9c7cf25ddf2035d1abbe35f67d55378bb9/engines/pastis-aflpp/pastisaflpp/driver.py#L51

会对corpus_dir和crash_dir的文件创建进行hook,一旦有新的文件创建,就调用__send_seed__send_crash函数, afl的fuzzer_stats文件有修改也会调用__send_telemetry发送

1
2
3
4
# Configure hookds on workspace
self.workspace.add_creation_hook(self.workspace.corpus_dir, self.__send_seed)
self.workspace.add_creation_hook(self.workspace.crash_dir, self.__send_crash)
self.workspace.add_file_modification_hook(self.workspace.stats_dir, self.__send_telemetry)

下面就是发送函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def __send_seed(self, filename: Path):
self.__send(filename, SeedType.INPUT)

def __send_crash(self, filename: Path):
# Skip README file that AFL adds to the crash folder.
if filename.name != 'README.txt':
self.__send(filename, SeedType.CRASH)

def __send(self, filename: Path, typ: SeedType):
self._tot_seeds += 1
file = Path(filename)
raw = file.read_bytes()
h = self.hash_seed(raw)
logging.debug(f'[{typ.name}] Sending new: {h} [{self._tot_seeds}]')
if h not in self._seed_recvs:
self._agent.send_seed(typ, raw)
else:
logging.info("seed (previously sent) do not send it back")
self._queue_to_send.append((filename, True if typ == SeedType.CRASH else False))

接收新样本

class AFLPPDriver类的__init__函数会调用self.__setup_agent()来初始化回调函数

1
2
3
4
def __setup_agent(self):
# Register callbacks.
self._agent.register_seed_callback(self.__seed_received)
self._agent.register_stop_callback(self.__stop_received)

接收函数

1
2
3
4
5
def __seed_received(self, typ: SeedType, seed: bytes):
h = self.hash_seed(seed)
logging.info(f"[SEED] received {h} ({typ.name})")
self._seed_recvs.add(h)
self.add_seed(seed)

通过add_seed可以看到,通过md5文件名存储,说明通过md5去重的

1
2
3
def add_seed(self, seed: bytes):
seed_path = self.workspace.dynamic_input_dir / f"seed-{hashlib.md5(seed).hexdigest()}"
seed_path.write_bytes(seed)

服务端

下面是收到样本(seed)的回调函数

1
2
3
4
5
6
7
8
def register_seed_callback(self, cb: Callable) -> None:
"""
Register a callback called when an input seed is received from the
broker. The callback function take 2 parameters seed type and content.

:param cb: callback function
"""
self.register_callback(MessageType.INPUT_SEED, cb)

在set_proxy函数中会设置这个回调函数

1
2
3
4
5
6
7
def _register_all(self):
self.register_seed_callback(self.seed_received)
self.register_hello_callback(self.hello_received)
self.register_log_callback(self.log_received)
self.register_telemetry_callback(self.telemetry_received)
self.register_stop_coverage_callback(self.stop_coverage_received)
self.register_data_callback(self.data_received)

而在PastisBroker类的__init__函数会调用self._register_all(),收到种子后就调用seed_received函数

可以看到这里哦天哪故宫md5计算,但是没用来判断,只是用_seed_pool这个字段判断,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def seed_received(self, cli_id: bytes, typ: SeedType, seed: bytes):
cli = self.get_client(cli_id)
if not cli:
return
is_new = seed not in self._seed_pool
h = md5(seed).hexdigest()

# Show log message and save seed to file
self.statmanager.update_seed_stat(cli, typ) # Add info only if new
cli.log(LogLevel.INFO, f"seed {h} [{self._colored_seed_type(typ)}][{self._colored_seed_newness(is_new)}]")
cli.add_own_seed(seed) # Add seed in client's seed
self.write_seed(typ, cli.strid, seed) # Write seed to file

if is_new:
self._seed_pool[seed] = typ # Save it in the local pool
else:
pass
# logging.warning(f"receive duplicate seed {h} by {cli.strid}")

# Iterate on all clients and send it to whomever never received it
if self.broker_mode == BrokingMode.FULL:
self.send_seed_to_all_others(cli.netid, typ, seed)

if self.is_proxied: # Forward it to the proxy
self._proxy.send_seed(typ, seed)
打赏专区