Skip to content

Commit

Permalink
Add runtime and static type checks (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
Flova authored Jan 10, 2025
2 parents 574b6fb + 2169c6a commit c88ae76
Show file tree
Hide file tree
Showing 132 changed files with 965 additions and 478 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,11 @@
"ros.distro": "iron",
"search.useIgnoreFiles": false,
"python.autoComplete.extraPaths": [
"/opt/openrobots/lib/python3.10/site-packages",
"/opt/ros/iron/lib/python3.10/site-packages"
],
"python.analysis.extraPaths": [
"/opt/openrobots/lib/python3.10/site-packages",
"/opt/ros/iron/lib/python3.10/site-packages"
],
"cmake.configureOnOpen": false,
Expand Down
19 changes: 12 additions & 7 deletions bitbots_behavior/bitbots_blackboard/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()

find_package(bio_ik_msgs REQUIRED)
find_package(ament_cmake REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(rclpy REQUIRED)
find_package(tf2 REQUIRED)
find_package(bio_ik_msgs REQUIRED)
find_package(bitbots_docs REQUIRED)
find_package(bitbots_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(rclpy REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(std_msgs REQUIRED)
find_package(tf2_geometry_msgs REQUIRED)
find_package(std_srvs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(bitbots_docs REQUIRED)
find_package(tf2 REQUIRED)
find_package(tf2_geometry_msgs REQUIRED)

set(INCLUDE_DIRS
${bio_ik_msgs_INCLUDE_DIRS}
Expand Down Expand Up @@ -76,6 +76,11 @@ ament_export_dependencies(geometry_msgs)
ament_export_dependencies(bitbots_docs)
ament_export_include_directories(${INCLUDE_DIRS})

if(BUILD_TESTING)
find_package(ament_cmake_mypy REQUIRED)
ament_mypy(CONFIG_FILE "${CMAKE_CURRENT_LIST_DIR}/mypy.ini")
endif()

ament_python_install_package(${PROJECT_NAME})

ament_package()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Setting up runtime type checking for this package
from beartype.claw import beartype_this_package

beartype_this_package()
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class BodyBlackboard:
def __init__(self, node: Node, tf_buffer: tf2.Buffer):
def __init__(self, node: Node, tf_buffer: tf2.BufferInterface):
# References
self.node = node
self.tf_buffer = tf_buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING

from bitbots_utils.utils import nobeartype
from rclpy.node import Node

if TYPE_CHECKING:
Expand All @@ -9,6 +10,7 @@
class AbstractBlackboardCapsule:
"""Abstract class for blackboard capsules."""

def __init__(self, node: Node, blackboard: Optional["BodyBlackboard"] = None):
@nobeartype
def __init__(self, node: Node, blackboard: "BodyBlackboard"):
self._node = node
self._blackboard = blackboard
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, node, blackboard):
self.calc_base_costmap()
self.calc_gradients()

def robot_callback(self, msg: RobotArray):
def robot_callback(self, msg: RobotArray) -> None:
"""
Callback with new robot detections
"""
Expand All @@ -77,10 +77,11 @@ def robot_callback(self, msg: RobotArray):
# Publish debug costmap
self.publish_costmap()

def publish_costmap(self):
def publish_costmap(self) -> None:
"""
Publishes the costmap for rviz
"""
assert self.costmap is not None, "Costmap is not initialized"
# Normalize costmap to match the rviz color scheme in a good way
normalized_costmap = (
(255 - ((self.costmap - np.min(self.costmap)) / (np.max(self.costmap) - np.min(self.costmap))) * 255 / 2.1)
Expand Down Expand Up @@ -131,7 +132,7 @@ def get_pass_regions(self) -> np.ndarray:
# Smooth obstacle map
return gaussian_filter(costmap, pass_smooth)

def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
"""
Converts a field position to the corresponding indices for the costmap.
Expand All @@ -153,10 +154,11 @@ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
)
return idx_x, idx_y

def calc_gradients(self):
def calc_gradients(self) -> None:
"""
Recalculates the gradient map based on the current costmap.
"""
assert self.base_costmap is not None, "Base costmap is not initialized"
gradient = np.gradient(self.base_costmap)
norms = np.linalg.norm(gradient, axis=0)

Expand Down Expand Up @@ -186,7 +188,7 @@ def cost_at_relative_xy(self, x: float, y: float) -> float:

return self.get_cost_at_field_position(point.point.x, point.point.y)

def calc_base_costmap(self):
def calc_base_costmap(self) -> None:
"""
Builds the base costmap based on the behavior parameters.
This costmap includes a gradient towards the enemy goal and high costs outside the playable area
Expand All @@ -203,8 +205,8 @@ def calc_base_costmap(self):

# Create Grid
grid_x, grid_y = np.mgrid[
0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j,
0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j,
0 : self.field_length + self.map_margin * 2 : (self.field_length + self.map_margin * 2) * 10j, # type: ignore[misc]
0 : self.field_width + self.map_margin * 2 : (self.field_width + self.map_margin * 2) * 10j, # type: ignore[misc]
]

fix_points: List[Tuple[Tuple[float, float], float]] = []
Expand Down Expand Up @@ -278,7 +280,8 @@ def calc_base_costmap(self):
)

# Smooth the costmap to get more continuous gradients
self.base_costmap = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
base_costmap: np.ndarray = gaussian_filter(interpolated, self.body_config["base_costmap_smoothing_sigma"])
self.base_costmap = base_costmap
self.costmap = self.base_costmap.copy()

def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, float]:
Expand All @@ -287,6 +290,7 @@ def get_gradient_at_field_position(self, x: float, y: float) -> Tuple[float, flo
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
assert self.gradient_map is not None, "Gradient map is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return -self.gradient_map[0][idx_x, idx_y], -self.gradient_map[1][idx_x, idx_y]

Expand All @@ -296,10 +300,11 @@ def get_cost_at_field_position(self, x: float, y: float) -> float:
:param x: Field coordinate in the x direction
:param y: Field coordinate in the y direction
"""
assert self.costmap is not None, "Costmap is not initialized"
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return self.costmap[idx_x, idx_y]

def get_gradient_direction_at_field_position(self, x: float, y: float):
def get_gradient_direction_at_field_position(self, x: float, y: float) -> float:
"""
Returns the gradient direction at the given position
:param x: Field coordinate in the x direction
Expand All @@ -318,7 +323,9 @@ def get_gradient_direction_at_field_position(self, x: float, y: float):
grad = self.get_gradient_at_field_position(x, y)
return math.atan2(grad[1], grad[0])

def get_cost_of_kick_relative(self, x: float, y: float, direction: float, kick_length: float, angular_range: float):
def get_cost_of_kick_relative(
self, x: float, y: float, direction: float, kick_length: float, angular_range: float
) -> float:
"""
Returns the cost of a kick at the given position and direction in base footprint frame
:param x: Field coordinate in the x direction
Expand Down Expand Up @@ -356,6 +363,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
:param kick_length: The length of the kick
:param angular_range: The angular range of the kick
"""
assert self.costmap is not None, "Costmap is not initialized"

# create a mask in the size of the costmap consisting of 8-bit values initialized as 0
mask = Image.new("L", (self.costmap.shape[1], self.costmap.shape[0]))
Expand Down Expand Up @@ -386,7 +394,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
# This should contribute way less than the max and should have an impact if the max values are similar in all directions.
return masked_costmap.max() * 0.75 + masked_costmap.min() * 0.25

def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float):
def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float) -> float:
"""
Returns the cost of the kick at the current position
:param direction: The direction of the kick
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from bitbots_utils.utils import get_parameters_from_other_node
from game_controller_hl_interfaces.msg import GameState

Expand All @@ -9,96 +11,86 @@ class GameStatusCapsule(AbstractBlackboardCapsule):

def __init__(self, node, blackboard=None):
super().__init__(node, blackboard)
self.team_id = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.team_id: int = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.gamestate = GameState()
self.last_update = 0
self.unpenalized_time = 0
self.last_goal_from_us_time = -86400
self.last_goal_time = -86400
self.free_kick_kickoff_team = None

def is_game_state_equals(self, value):
assert value in [
GameState.GAMESTATE_PLAYING,
GameState.GAMESTATE_FINISHED,
GameState.GAMESTATE_INITIAL,
GameState.GAMESTATE_READY,
GameState.GAMESTATE_SET,
]
return value == self.get_gamestate()

def get_gamestate(self):
self.last_update: float = 0.0
self.unpenalized_time: float = 0.0
self.last_goal_from_us_time = -86400.0
self.last_goal_time = -86400.0
self.free_kick_kickoff_team: Optional[bool] = None

def get_gamestate(self) -> int:
return self.gamestate.game_state

def get_secondary_state(self):
def get_secondary_state(self) -> int:
return self.gamestate.secondary_state

def get_secondary_state_mode(self):
def get_secondary_state_mode(self) -> int:
return self.gamestate.secondary_state_mode

def get_secondary_team(self):
def get_secondary_team(self) -> int:
return self.gamestate.secondary_state_team

def has_kickoff(self):
def has_kickoff(self) -> bool:
return self.gamestate.has_kick_off

def has_penalty_kick(self):
def has_penalty_kick(self) -> bool:
return (
self.gamestate.secondary_state == GameState.STATE_PENALTYKICK
or self.gamestate.secondary_state == GameState.STATE_PENALTYSHOOT
) and self.gamestate._secondary_state_team == self.team_id

def get_own_goals(self):
def get_our_goals(self) -> int:
return self.gamestate.own_score

def get_opp_goals(self):
def get_opp_goals(self) -> int:
return self.gamestate.rival_score

def get_seconds_since_own_goal(self):
def get_seconds_since_own_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_from_us_time

def get_seconds_since_any_goal(self):
def get_seconds_since_any_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_time

def get_seconds_remaining(self):
def get_seconds_remaining(self) -> float:
# Time from the message minus time passed since receiving it
return max(
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0.0
)

def get_secondary_seconds_remaining(self):
def get_secondary_seconds_remaining(self) -> float:
"""Seconds remaining for things like kickoff"""
# Time from the message minus time passed since receiving it
return max(
self.gamestate.secondary_seconds_remaining
- (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update),
0,
0.0,
)

def get_seconds_since_last_drop_ball(self):
def get_seconds_since_last_drop_ball(self) -> Optional[float]:
"""Returns the seconds since the last drop in"""
if self.gamestate.drop_in_time == -1:
return None
else:
# Time from the message plus seconds passed since receiving it
return self.gamestate.drop_in_time + (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update)

def get_seconds_since_unpenalized(self):
def get_seconds_since_unpenalized(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.unpenalized_time

def get_is_penalized(self):
def get_is_penalized(self) -> bool:
return self.gamestate.penalized

def received_gamestate(self):
return self.last_update != 0
def received_gamestate(self) -> bool:
return self.last_update != 0.0

def get_team_id(self):
def get_team_id(self) -> int:
return self.team_id

def get_red_cards(self):
def get_red_cards(self) -> int:
return self.gamestate.team_mates_with_red_card

def gamestate_callback(self, gamestate_msg: GameState):
def gamestate_callback(self, gamestate_msg: GameState) -> None:
if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self._node.get_clock().now().nanoseconds / 1e9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class KickCapsule(AbstractBlackboardCapsule):
is_currently_kicking: bool = False

__connected: bool = False
__action_client: ActionClient = None
__action_client: Optional[ActionClient] = None

class WalkKickTargets(Flag):
"""
Expand All @@ -43,22 +43,22 @@ def __init__(self, node, blackboard):
self.walk_kick_pub = self._node.create_publisher(Bool, "/kick", 1)
# self.connect_dynamic_kick() Do not connect if dynamic_kick is disabled

def walk_kick(self, target: WalkKickTargets):
def walk_kick(self, target: WalkKickTargets) -> None:
"""
Kick the ball while walking
:param target: Target for the walk kick (e.g. left or right foot)
"""
self.walk_kick_pub.publish(Bool(data=target.value))

def connect_dynamic_kick(self):
def connect_dynamic_kick(self) -> None:
topic = self._blackboard.config["dynamic_kick"]["topic"]
self.__action_client = ActionClient(self._node, Kick, topic, callback_group=ReentrantCallbackGroup())
self.__connected = self.__action_client.wait_for_server(self._blackboard.config["dynamic_kick"]["wait_time"])

if not self.__connected:
self._node.get_logger().error(f"No dynamic_kick server running on {topic}")

def dynamic_kick(self, goal: Kick.Goal):
def dynamic_kick(self, goal: Kick.Goal) -> None:
"""
:param goal: Goal to kick to
:type goal: KickGoal
Expand Down
Loading

0 comments on commit c88ae76

Please sign in to comment.