Skip to content

Commit

Permalink
Merge branch 'main' into reelection_take2
Browse files Browse the repository at this point in the history
  • Loading branch information
achamayou authored Feb 1, 2024
2 parents 9079997 + 9ca9965 commit 8d98280
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ jobs:
# Update backport action (https://github.com/sqren/backport/issues/391#issuecomment-1156355381)
- name: Backport Action
uses: sqren/backport-github-action@main
uses: sorenlouv/backport-github-action@main
with:
github_token: ${{ secrets.BACKPORT_ACTION }}
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,10 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/connections.py
)

add_e2e_test(
NAME fuzz_test PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/fuzzing.py
)

if(CLIENT_PROTOCOLS_TEST)
add_e2e_test(
NAME client_protocols
Expand Down
2 changes: 1 addition & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
loguru >= 0.5, == 0.*
cryptography == 41.*
cryptography == 42.*
string-color >= 1.2.1
pycose >= 1.0.1
13 changes: 11 additions & 2 deletions src/host/node_connections.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,17 @@ namespace asynchost
}

const auto size_post_headers = size;
const size_t payload_size =
msg_size.value() - (size_pre_headers - size_post_headers);
const auto header_size = size_pre_headers - size_post_headers;
if (header_size > msg_size.value())
{
LOG_DEBUG_FMT(
"Received invalid node-to-node traffic. Total msg size {} "
"doesn't even contain headers (of size {})",
msg_size.value(),
header_size);
return false;
}
const size_t payload_size = msg_size.value() - header_size;

if (!node.has_value())
{
Expand Down
1 change: 1 addition & 0 deletions tests/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def node_tcp_socket(node):
s.close()


# NB: This does rudimentary smoke testing. See fuzzing.py for more thorough test
def run_node_socket_robustness_tests(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
Expand Down
208 changes: 208 additions & 0 deletions tests/fuzzing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import struct
import boofuzz
import datetime
from loguru import logger as LOG


class CCFFuzzLogger(boofuzz.IFuzzLogger):
def __init__(self, print_period=datetime.timedelta(seconds=3), keep_lines=50):
self.log_lines = []
self.print_period = print_period
self.last_printed = None
self.keep_lines = keep_lines

self.session = None

def _store_line(self, s):
self.log_lines.append(s)
self.log_lines = self.log_lines[-self.keep_lines :]

if self.session is not None:
now = datetime.datetime.now()
if self.last_printed is None or now - self.last_printed > self.print_period:
LOG.info(
f"Fuzzed {self.session.num_cases_actually_fuzzed} total cases in {self.session.runtime:.2f}s (rate={self.session.exec_speed:.2f}/s)"
)
self.last_printed = now

def open_test_case(self, test_case_id, name, index, *args, **kwargs):
self._store_line(f"Test case: {name} ({index=})")

def open_test_step(self, description):
self._store_line(f" Test step: {description}")

def log_send(self, data):
self._store_line(infra.clients.escape_loguru_tags(f" Sent: {data}"))

def log_recv(self, data):
self._store_line(infra.clients.escape_loguru_tags(f" Received: {data}"))

def log_check(self, description):
self._store_line(f" Checking: {description}")

def log_pass(self, description=""):
self._store_line(f" Passed: {description}")

def log_fail(self, description=""):
self._store_line(f" Fail: {description}")

def log_info(self, description):
self._store_line(f" {description}")

def log_error(self, description):
self._store_line(f" Error: {description}")

def close_test_case(self):
pass

def close_test(self):
pass


def ccf_node_post_send(node):
def post_send_callback(fuzz_data_logger=None, *args, **kwargs):
done = node.remote.check_done()
if done:
fuzz_data_logger.log_error("Node has exited")
return done

return post_send_callback


def ccf_node_restart_callback(*args, **kwargs):
raise boofuzz.exception.BoofuzzRestartFailedError(
"CCF nodes cannot be restarted - see earlier failure"
)


def fuzz_node_to_node(network, args):
req = boofuzz.Request(
"N2N",
children=[
boofuzz.Block(
"Header",
children=[
boofuzz.Size(
"TotalSize",
block_name="N2N",
length=4,
# Non-inclusive. inclusive=False doesn't work, so manually offset
offset=-4,
),
boofuzz.Group(
"MessageType",
values=[struct.pack("<Q", msg_type) for msg_type in (0, 1, 2)],
),
boofuzz.Block(
"SenderID",
children=[
boofuzz.Size(
"SenderSize",
block_name="SenderContent",
length=8,
inclusive=False,
),
boofuzz.RandomData(
"SenderContent",
default_value="OtherNode".encode(),
max_length=32,
),
],
),
],
),
# TODO: Different types of body, based on message type?
boofuzz.Block(
"Body",
children=[
boofuzz.RandomData(
"BodyContent",
max_length=128,
),
],
),
],
)

primary, _ = network.find_primary()
interface = primary.n2n_interface

fuzz_logger = CCFFuzzLogger()
session = boofuzz.Session(
target=boofuzz.Target(
connection=boofuzz.TCPSocketConnection(interface.host, interface.port),
),
# Check if the node process is alive after each send
post_test_case_callbacks=[ccf_node_post_send(primary)],
# Fail if ever asked to restart a node
restart_callbacks=[ccf_node_restart_callback],
# Use loguru output formatted like everything else
fuzz_loggers=[fuzz_logger],
# Don't try to host a web UI
web_port=None,
# Don't try to read any responses
receive_data_after_fuzz=False,
receive_data_after_each_request=False,
)
fuzz_logger.session = session

session.connect(req)

LOG.warning("These tests are verbose and run for a long time")
LOG.warning(
f"Limiting spam by summarising every {fuzz_logger.print_period.total_seconds()}s"
)

LOG.info("Confirming non-fuzzed request format")
try:
session.feature_check()
except:
LOG.error("Error during feature check")
LOG.error(
"Recent fuzz session output was:\n" + "\n".join(fuzz_logger.log_lines)
)
raise

LOG.info("Fuzzing")
try:
session.fuzz(max_depth=2)
except:
LOG.error("Error during fuzzing")
LOG.error(
"Recent fuzz session output was:\n" + "\n".join(fuzz_logger.log_lines)
)
raise

LOG.info(f"Fuzzed {session.num_cases_actually_fuzzed} cases")


def run(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)

# Don't fill the output with failure messages from fuzzing
network.ignore_error_pattern_on_shutdown(
"Exception in bool ccf::Channel::recv_key_exchange_message"
)
network.ignore_error_pattern_on_shutdown(
"Exception in void ccf::Forwarder<ccf::NodeToNode>::recv_message"
)
network.ignore_error_pattern_on_shutdown("Unknown node message type")
network.ignore_error_pattern_on_shutdown("Unhandled AFT message type")
network.ignore_error_pattern_on_shutdown("Unknown frontend msg type")

fuzz_node_to_node(network, args)


if __name__ == "__main__":
args = infra.e2e_args.cli_args()
args.package = "samples/apps/logging/liblogging"

args.nodes = infra.e2e_args.min_nodes(args, f=0)
run(args)
Original file line number Diff line number Diff line change
@@ -1,31 +1,59 @@
# This scenario re-creates figure 8 from the Raft paper

nodes,0,1,2,3,4
start_node,0
assert_is_primary,0
emit_signature,2

connect,0,1
connect,0,2
connect,0,3
connect,0,4
assert_is_primary,0
assert_commit_idx,0,2

# Node 0 starts first, becomes primary
periodic_one,0,110
trust_node,2,1
emit_signature,2

dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,4

trust_node,2,2
emit_signature,2

dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,6

trust_node,2,3
emit_signature,2

dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,8

trust_node,2,4
emit_signature,2

dispatch_all
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,10
assert_state_sync

# Node 0 appends a single entry, replicates it to all nodes
replicate,1,entry_1
emit_signature,1
replicate,2,entry_1
emit_signature,2

periodic_all,10
dispatch_all

# Node 0 appends a second entry, which only reaches Node 1
disconnect_node,0
connect,0,1
replicate,1,entry_2
emit_signature,1
replicate,2,entry_2
emit_signature,2

periodic_all,10
dispatch_all
Expand All @@ -38,8 +66,8 @@ connect,4,2
periodic_one,4,110
dispatch_all

replicate,2,entry_3
emit_signature,2
replicate,3,entry_3
emit_signature,3

state_all

Expand Down Expand Up @@ -91,8 +119,8 @@ dispatch_all
state_all

# Not yet! We need node 4 to produce a new entry in its current term
replicate,4,entry_4
emit_signature,4
replicate,5,entry_4
emit_signature,5

periodic_all,10
dispatch_all
Expand Down
4 changes: 2 additions & 2 deletions tests/raft_scenarios_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ def preprocess_for_trace_validation(log):
return log
log_by_node = defaultdict(list)
initial_node = None
last_cmd = None
last_cmd = ""
for line in log:
entry = json.loads(line)
if "cmd" in entry:
last_cmd = entry["cmd"]
continue
node = entry["msg"]["state"]["node_id"]
entry["cmd"] = last_cmd
last_cmd = None
last_cmd = ""
if initial_node is None:
initial_node = node
if entry["msg"]["function"] == "add_configuration":
Expand Down
3 changes: 2 additions & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ gelidum
fastparquet==2023.*
prettytable==3.*
polars
plotext
plotext
boofuzz
2 changes: 1 addition & 1 deletion tests/trace_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def table(lines):
f"[{entry['h_ts']:>{dcfg.ts}}] "
+ " ".join(render_state(*state, dcfg) for state in states if state[0])
+ " "
+ (entry["cmd"] or "")
+ entry["cmd"]
)
return rows

Expand Down
Loading

0 comments on commit 8d98280

Please sign in to comment.