Skip to content

Commit

Permalink
qemu_guest_agent: Add new api support 'guest-network-get-route'
Browse files Browse the repository at this point in the history
Add new api 'guest-network-get-route' to support qga query route
info of Guest.

Signed-off-by: Dehan Meng <[email protected]>
  • Loading branch information
6-dehan committed Jan 20, 2025
1 parent b2108ff commit 038735f
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 0 deletions.
7 changes: 7 additions & 0 deletions qemu/tests/cfg/qemu_guest_agent.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@
image_snapshot = no
cmd_disable_network = wmic path win32_networkadapter where "NetConnectionID='%s'" call disable
cmd_enable_network = wmic path win32_networkadapter where "NetConnectionID='%s'" call enable
- check_get_network_route:
only Linux
gagent_check_type = get_network_route
ipv4_keys = "iface", "destination", "mask", "metric", "gateway"
ipv6_keys = "iface", "destination", "metric", "desprefixlen", "nexthop"
cmd_ipv4route = "ip route"
cmd_ipv6route = "ip -6 route"
- check_fsfreeze:
gagent_fs_test_cmd = "echo foo > %s/foo"
delete_temp_file = "rm -rf ${mountpoint_def}/foo"
Expand Down
175 changes: 175 additions & 0 deletions qemu/tests/qemu_guest_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,181 @@ def ip_addr_check(session, mac_addr, ret_list, if_index, if_name):
if session_serial:
session_serial.close()

@error_context.context_aware
def gagent_check_get_network_route(self, test, params, env):
"""
Execute "guest-network-get-route" command to guest agent
Steps:
1) Get route info from guest-agent API 'guest-network-get-route'
2) Get route info from inside the guest
3) Compare the route info from qga and guest
:param test: kvm test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""

import ipaddress

def parse_ipv4_route(route_output):
"""
parse ipv4 route info and process the info.
:return: ipv4 route info
"""
routes = route_output.strip().splitlines()
processed_routes = []

for line in routes:
# if 'default via' starts the line, handle specifically.
if line.startswith("default via"):
parts = line.split()
gateway = parts[2]
iface = parts[4]
metric = parts[-1] if "metric" in line else None
destination = "0.0.0.0"
mask = "0.0.0.0"
processed_routes.append(
{
"iface": iface,
"destination": destination,
"mask": mask,
"gateway": gateway,
"metric": metric,
}
)
else:
match = re.match(
r"(?P<destination>[\d./]+)\s+dev\s+(?P<iface>\S+).*?\s+metric\s+(?P<metric>\d+)",
line,
)
if match:
destination = match.group("destination")
iface = match.group("iface")
metric = match.group("metric")

if "/" in destination:
ip, prefixlen = destination.split("/")
mask = str(
ipaddress.IPv4Network(f"0.0.0.0/{prefixlen}").netmask
)
else:
ip = destination
mask = "255.255.255.255"

gateway = "0.0.0.0"

processed_routes.append(
{
"iface": iface,
"destination": ip,
"mask": mask,
"gateway": gateway,
"metric": metric,
}
)

return processed_routes

def parse_ipv6_route(route_output):
"""
parse ipv6 route info and process the info.
:return: ipv6 route info
"""
routes = route_output.strip().splitlines()
processed_routes = []
route_pattern = re.compile(
r"(?P<destination>[\w:\/]+)\s+dev\s+(?P<iface>\S+)\s+proto\s+\S+\s+metric\s+(?P<metric>\d+)\s+pref\s+\S+"
)
for line in routes:
if line.startswith("default via"):
line = line.replace("default via", "").strip()
destination = "::"
nexthop = line.split()[0]
iface = line.split()[2]
metric = line.split()[-3]
desprefixlen = 0 # default desprefixlen should be 0
processed_routes.append(
{
"iface": iface,
"destination": destination,
"desprefixlen": desprefixlen,
"nexthop": nexthop,
"metric": metric,
}
)
continue
match = route_pattern.match(line)
if match:
destination = match.group("destination")
iface = match.group("iface")
metric = match.group("metric")

if "/" in destination:
destination = destination.split("/")[0]
desprefixlen = 64
else:
desprefixlen = 128
nexthop = "::"
processed_routes.append(
{
"iface": iface,
"destination": destination,
"desprefixlen": desprefixlen,
"nexthop": nexthop,
"metric": metric,
}
)
return processed_routes

session = self._get_session(params, self.vm)
self._open_session_list.append(session)

error_context.context("Getting route info via guest-agent API.", LOG_JOB.info)
qga_route_info = self.gagent.get_network_route()

error_context.context("Getting route info from inside the guest.", LOG_JOB.info)
ipv4_routes = parse_ipv4_route(session.cmd_output(self.params["cmd_ipv4route"]))
ipv6_routes = parse_ipv6_route(session.cmd_output(self.params["cmd_ipv6route"]))
guest_route_info = ipv4_routes + ipv6_routes

error_context.context(
"Compare the route info from qga and guest.", LOG_JOB.info
)
ipv4_keys = list(self.params["ipv4_keys"])
ipv6_keys = list(self.params["ipv6_keys"])

for guest_entry in guest_route_info:
matched_qga_entry = None
for qga_entry in qga_route_info:
if all(
guest_entry.get(key) == qga_entry.get(key)
for key in ["iface", "destination"]
):
matched_qga_entry = qga_entry
break

if matched_qga_entry:
keys_to_compare = ipv4_keys if "mask" in guest_entry else ipv6_keys
differences = {}
for key in keys_to_compare:
guest_value = guest_entry.get(key)
qga_value = matched_qga_entry.get(key)
if str(guest_value) != str(qga_value):
differences[key] = {"guest": guest_value, "qga": qga_value}

if differences:
test.fail(f"There are differences found: {differences}")
else:
test.error(
f"No matching QGA entry found for Guest entry: {guest_entry}"
)

if session:
session.close()

@error_context.context_aware
def gagent_check_reboot_shutdown(self, test, params, env):
"""
Expand Down

0 comments on commit 038735f

Please sign in to comment.