-
Notifications
You must be signed in to change notification settings - Fork 0
Week1
- A quick demonstration
- ROS refresher
- MDK structure
- Worked Example: MiRo playing with the toy ball
- Exercises
Now that you've gone through the Getting Started
page you should have an open WSL-ROS terminal window with the MiRo Developer Kit environment variables.
Let's start with a quick demonstration, by launch the Gazebo simulator with MiRo and opening up an interactive GUI to control it.
You'll need to have three terminal windows (or tabs) opened and execute the following commands:
- [Terminal 1]:
$ roscore
- [Terminal 2]:
$ miro_sim
- [Terminal 3]:
$ miro_gui
Now you should be able to see MiRo in the arena and be able to control it.
The miro_sim
command in terminal 2 is a shorthand for changing the folder to the MDK simulator folder cd ~/mdk/sim
and launching the simulator with ./launch_sim
, while the miro_gui
command in terminal 3 is an alias for going to the scripts folder with cd ~/mdk/bin/shared
and running ./client_gui.py
.
All the MDK files live in a dedicated (soft-linked) folder, located in the user's home directory: ~/mdk
When developing programs be careful to use the MDK catkin workspace at
~/mdk/catkin_ws
, not the TB3 workspace at~/catkin_ws
!
MiRo uses the Robot Operating System (ROS) as middleware to manage communication of data between program code and robot sensors and effectors.
Every ROS network requires a single continually running ROS master (or ROS core) that manages all connected systems.
The environmental variable $ROS_MASTER_URI specifies where to start (or look for) the ROS core on the network.
When using a physical MiRo, a ROS core is started when the robot boots, therefore you don’t need to start one.
When working with the simulator, the $ROS_MASTER_URI points at your machine through loclahost
and you can simply type roscore
to start one.
You don't have to change $ROS_MASTER_URI on WSL machines, but you will learn in the next session how to quickly do that for the laptops, since those can be used for all three MiRo profiles.
A ROS node is “a process that performs computation”.
A MiRo robot in demo mode will have several co-existing ROS nodes managing the cameras and general sensory state, and your script will need to initialise a ROS node to issue and receive ROS commands.
The rosnode command can perform various operations on ROS nodes.
For example, if you have the simulator running or connected to the real robot, you can run $ rosnode list
in the terminal to view the currently running nodes.
ROS topics are named buses that carry messages containing the actual packaged data sent between ROS network nodes.
Run $ rostopic list
in the terminal to view the topics currently running on your MiRo.
For example, the topic /miro/sensors/caml
contains message data from MiRo’s left camera, and /miro/control/kinematic_joints
contains message data that controls the movement of MiRo’s head and neck.
Messages are encoded as a specific type; these may be a standard type such as String
or a MiRo-specific type such as JointState
.
The rostopic
and rosmsg
commands can perform various operations on ROS topics and messages respectively.
All ROS topics are available to all nodes on the network; any node may publish (send data to) a topic of any name and may similarly subscribe to (request data from) any topic. For more information, see the ROS Computation Graph section of the ROS Wiki.
When it comes to programming language, ROS (and hence MDK) widely supports only two languages: C++ and Python. However, most of the MDK code is written in Python, which is why this guide covers Python only.
NOTE: Before the latest release, the MDK only supported ROS Kinetic and Melodic, which by default use Python 2. We aim to write our code in agnostic way, but occasionally the Python 2/3 inconsistency may crop up. Please let us know if spot any.
The essential package that facilitates the ROS-Python communication is rospy
.
To get started with it, you will need to, at a minimum:
- Import message type definitions
- Initialise a ROS node
- Set up a publisher and/or subscriber to communicate with other nodes on the ROS network
These steps might look like this:
Message types
from std_msgs.msg import Float32MultiArray, UInt32MultiArray, UInt16MultiArray, UInt8MultiArray, UInt16, UInt32, Int16MultiArray, String
from geometry_msgs.msg import Pose2D, TwistStamped
from sensor_msgs.msg import BatteryState, CompressedImage, Image, Imu, JointState, Range
You may of course omit message types you do not intend to use.
ROS Node
rospy.init_node('<NODE NAME>', anonymous=True)
The anonymous=True
flag is not strictly required but will help avoid conflicting node names.
ROS Publisher
pub = rospy.Publisher('<TOPIC NAME>', <MESSAGE TYPE>, queue_size=<N>)
pub.publish(<DATA>)
ROS Subscriber
rospy.Subscriber('<TOPIC NAME>', <MESSAGE TYPE>, <FUNCTION>)
Here, <FUNCTION>
is callback function that will be called every time this ROS topic receives a new message.
Going further, it may be useful to look at the client GUI (~/mdk/bin/shared/client_gui.py
), the demo code (~/mdk/share/python/miro2/core/client_demo.py
), or one of the provided examples to see how this works in practice.
The MDK (MiRo Developer Kit) is an assortment of code and libraries required to communicate with and program the MiRo robot.
Inside the MDK there are many useful tools and packages, including:
- Code for MiRo’s autonomous mode (
~/mdk/share/python/miro2
) - Virtual MiRo model and world files, plus a Gazebo launch script (
~/mdk/sim
) - Various tests and utils (
~/mdk/bin/shared
) - MiRo-specific Python packages and ROS message definitions (
~/mdk/catkin_ws/src/miro2_msg
)
You should have three terminal windows open from previously, running ROS core, Gazebo simulator and Client GUI, respectively. Stop the third one with CTRL+C. Clone and compile the COM3528 code repo, and then 'source' so that ROS knows about the new package:
[Terminal 3] $ cd ~/mdk/catkin_ws/src
[Terminal 3] $ git clone https://github.com/AlexandrLucas/COM3528
[Terminal 3] $ catkin build
[Terminal 3] $ src
Now run the kick_blue_ball.py
script, either directly:
[Terminal 3] $ roscd com3528_examples/src
[Terminal 3] $ python3 kick_blue_ball.py
or using rosrun
:
[Terminal 3] $ rosrun com3528_examples kick_blue_ball.py
In this program, image processing is employed to detect a blue ball on both camera images.
Based on the relative position of the ball on the images, MiRo tries to turn so that it faces the ball and then move forward to try and hit it.
Now let's have a look at the code in more detail.
#!/usr/bin/env python3
The presence of the shebang line informs the terminal shell that this file can be run directly and if so, what program should be used to do so.
Now, assuming you're still in the same folder, try calling the script directly:
[Tab 3] $ ./kick_blue_ball.py
If you receive errors running the script this way, check that it has the correct permissions.
To make a file executable, runchmod
command:$ chmod +x kick_blue_ball.py
However, with the ROS packaging system, one can go even further.
Having a compiled package in the catkin_ws
means that ROS nodes can be called through rosrun
.
There are 2 main advantages of this approach:
- Autocompletion
- Path independence
This means you can be in any directory and only remember the first letters of the package and the script name to run it.
[Tab 3] $ rosrun mi[TAB]ro2_e[TAB]xamples ki[TAB]ck_blue_ball.py
Doing a double TAB input lists all of the available options.
This
rosrun
functionality should be enabled by default in the WSL-ROS environment.
If using a custom ROS installation, add
source ~/mdk/catkin_ws/devel/setup.bash
on the next line aftersource ~/mdk/setup.bash
in your.bashrc
profile.
# Imports
##########################
import os
from math import radians # This is used to reset the head pose
import numpy as np # Numerical Analysis library
import cv2 # Computer Vision library
import rospy # ROS Python interface
from sensor_msgs.msg import CompressedImage # ROS CompressedImage message
from sensor_msgs.msg import JointState # ROS joints state message
from cv_bridge import CvBridge, CvBridgeError # ROS -> OpenCV converter
from geometry_msgs.msg import TwistStamped # ROS cmd_vel (velocity control) message
import miro2 as miro # Import MiRo Developer Kit library
try: # For convenience, import this util separately
from miro2.lib import wheel_speed2cmd_vel # Python 3
except ImportError:
from miro2.utils import wheel_speed2cmd_vel # Python 2
##########################
For convenience, imports are divided into sections, going from the standard and widely-used packages, to ROS and ROS-specific messages, and then to the MDK, built on top of ROS specifically to interface with the MiRo robot.
The final import is done separately, since it's sensitive to the Python version; there were changes in the MDK structure when moving from 2 to 3.
The main bulk of the program is contained in the attributes and functions of the MiRoClient
class.
Class constants (attributes in capital letters) come first and are used to set various parameters of the script. For example, changing DEBUG
to True
will display intermediate steps in the image processing. Try changing a few values!
TICK = 0.01 # This is the update interval for the main control loop in secs
CAM_FREQ = 1 # Number of ticks before camera gets a new frame, increase in case of network lag
NODE_EXISTS = True # Disables (True) / Enables (False) rospy.init_node
SLOW = 0.1 # Radial speed when turning on the spot (rad/s)
FAST = 0.4 # Linear speed when kicking the ball (m/s)
DEBUG = False # Set to True to enable debug views of the cameras
This is very the first thing that happens when the script is called. Here, we set up publishers, subscribers and their callbacks that are required to communicate with other nodes on the ROS network, as well as some variables to help transport information between topics.
def __init__(self):
# Initialise a new ROS node to communicate with MiRo
if not self.NODE_EXISTS:
rospy.init_node("kick_blue_ball", anonymous=True)
# Give it some time to make sure everything is initialised
rospy.sleep(2.0)
# Initialise CV Bridge
self.image_converter = CvBridge()
# Individual robot name acts as ROS topic prefix
topic_base_name = "/" + os.getenv("MIRO_ROBOT_NAME")
# Create two new subscribers to recieve camera images with attached callbacks
self.sub_caml = rospy.Subscriber(
topic_base_name + "/sensors/caml/compressed",
CompressedImage,
self.callback_caml,
queue_size=1,
tcp_nodelay=True,
)
self.sub_camr = rospy.Subscriber(
topic_base_name + "/sensors/camr/compressed",
CompressedImage,
self.callback_camr,
queue_size=1,
tcp_nodelay=True,
)
# Create a new publisher to send velocity commands to the robot
self.vel_pub = rospy.Publisher(
topic_base_name + "/control/cmd_vel", TwistStamped, queue_size=0
)
# Create a new publisher to move the robot head
self.pub_kin = rospy.Publisher(
topic_base_name + "/control/kinematic_joints", JointState, queue_size=0
)
# Create handle to store images
self.input_camera = [None, None]
# New frame notification
self.new_frame = [False, False]
# Create variable to store a list of ball's x, y, and r values for each camera
self.ball = [None, None]
# Set the default frame width (gets updated on receiving an image)
self.frame_width = 640
# Action selector to reduce duplicate printing to the terminal
self.just_switched = True
# Bookmark
self.bookmark = 0
# Move the head to default pose
self.reset_head_pose()
The reset_head_pose
function makes sure that MiRo's head is not tilted to any one side.
If that would be the case, getting the correct position of the ball on the image would have to account also for the roll angle of the cameras.
In the main loop, MiRo cycles between three behaviours via the status_code
variable:
- Turning on the spot if it doesn't see a blue ball.
- Once MiRo caught the sight of it with at least one of the cameras, MiRo turns so that the ball is dead centre.
- MiRo moves forward to try and hit the ball with its body.
Upon successful completion of stage 3 or in case it fails any of the stages, MiRo goes back to stage 1.
The main loop on its own can run as fast as the CPU allows (which is very fast), but in practical terms the usefulness is limited by the frequency of the robot's sensor reading updates.
This is why you can see the rospy.sleep(self.TICK)
added at the end.
print("MiRo plays ball, press CTRL+C to halt...")
# Main control loop iteration counter
self.counter = 0
# This switch loops through MiRo behaviours:
# Find ball, lock on to the ball and kick ball
self.status_code = 0
while not rospy.core.is_shutdown():
# Step 1. Find ball
if self.status_code == 1:
# Every once in a while, look for ball
if self.counter % self.CAM_FREQ == 0:
self.look_for_ball()
# Step 2. Orient towards it
elif self.status_code == 2:
self.lock_onto_ball()
# Step 3. Kick!
elif self.status_code == 3:
self.kick()
# Fall back
else:
self.status_code = 1
# Yield
self.counter += 1
rospy.sleep(self.TICK)
First thing that happens upon look_for_ball
action selection is lowering of the just_switched
flag to prevent printing the same message over and over again.
Then, MiRo checks if a new frame has arrived from the cameras (remember, those are unrelated and asynchronous events!), and if so, calls the detect_ball
function (which will be explained later).
If, indeed, there is a blue ball on the image, the MiRo switches to the next action; otherwise it simply rotates clockwise in hopes of finding one.
def look_for_ball(self):
"""
[1 of 3] Rotate MiRo if it doesn't see a ball in its current
position, until it sees one.
"""
if self.just_switched: # Print once
print("MiRo is looking for the ball...")
self.just_switched = False
for index in range(2): # For each camera (0 = left, 1 = right)
# Skip if there's no new image, in case the network is choking
if not self.new_frame[index]:
continue
image = self.input_camera[index]
# Run the detect ball procedure
self.ball[index] = self.detect_ball(image, index)
# If no ball has been detected
if not self.ball[0] and not self.ball[1]:
self.drive(self.SLOW, -self.SLOW)
else:
self.status_code = 2 # Switch to the second action
self.just_switched = True
The second action, lock_onto_ball
is very similar to the first one, but with added conditions that check which cameras see the ball.
Most importantly, if the ball is seen with both cameras, MiRo tries to tweak its orientation so that the ball is directly in front of it.
This procedure uses the horizontal coordinate of centre of the ball on both frames.
Once these values are within acceptable error range, MiRo switches to the third and final action.
def lock_onto_ball(self, error=25):
"""
[2 of 3] Once a ball has been detected, turn MiRo to face it
"""
if self.just_switched: # Print once
print("MiRo is locking on to the ball")
self.just_switched = False
for index in range(2): # For each camera (0 = left, 1 = right)
# Skip if there's no new image, in case the network is choking
if not self.new_frame[index]:
continue
image = self.input_camera[index]
# Run the detect ball procedure
self.ball[index] = self.detect_ball(image, index)
# If only the right camera sees the ball, rotate clockwise
if not self.ball[0] and self.ball[1]:
self.drive(self.SLOW, -self.SLOW)
# Conversely, rotate counterclockwise
elif self.ball[0] and not self.ball[1]:
self.drive(-self.SLOW, self.SLOW)
# Make the MiRo face the ball if it's visible with both cameras
elif self.ball[0] and self.ball[1]:
error = 0.05 # 5% of image width
# Use the normalised values
left_x = self.ball[0][0] # should be in range [0.0, 0.5]
right_x = self.ball[1][0] # should be in range [-0.5, 0.0]
rotation_speed = 0.03 # Turn even slower now
if abs(left_x) - abs(right_x) > error:
self.drive(rotation_speed, -rotation_speed) # turn clockwise
elif abs(left_x) - abs(right_x) < -error:
self.drive(-rotation_speed, rotation_speed) # turn counterclockwise
else:
# Successfully turned to face the ball
self.status_code = 3 # Switch to the third action
self.just_switched = True
self.bookmark = self.counter
# Otherwise, the ball is lost :-(
else:
self.status_code = 0 # Go back to square 1...
print("MiRo has lost the ball...")
self.just_switched = True
The last action is where MiRo (metaphorically) kicks the ball, by simply driving into it.
The "drive forward" movement lasts for 2 seconds, irrespective of the self.TICK
value.
If the ball wasn't reached on the first try, the assumption is that it didn't roll very far away (and hopefully thus is still centred), and so MiRo can simply repeat this movement.
def kick(self):
"""
[3 of 3] Once MiRO is in position, this function should drive the MiRo
forward until it kicks the ball!
"""
if self.just_switched:
print("MiRo is kicking the ball!")
self.just_switched = False
if self.counter <= self.bookmark + 2 / self.TICK:
self.drive(self.FAST, self.FAST)
else:
self.status_code = 0 # Back to the default state after the kick
self.just_switched = True
Finally, we'll look at the details of the image processing using the OpenCV library that helps detect the ball.
- It's important to remember that OpenCV's default colour space is BGR, and conversion to BGR needs to be done first.
- As was found empirically, the blue toy ball and blue colour in general is more distinct in the HSV (hue, saturation, value) space.
- The variance in the desired colour is transformed into a corresponding hue range. Based on this range, a mask is generated and subtracted from the image, leaving only the desired hue range.
- The image is smoothed to reduce noise and other artefacts.
- Hough Circle Transform is applied to get a list of circles on the image, together with their positions and radii.
- The biggest circle present is selected, and its normalised position and radius are returned as the function output.
def detect_ball(self, frame, index):
"""
Image processing operations, fine-tuned to detect a small,
toy blue ball in a given frame.
"""
if frame is None: # Sanity check
return
# Debug window to show the frame
if self.DEBUG:
cv2.imshow("camera" + str(index), frame)
cv2.waitKey(1)
# Flag this frame as processed, so that it's not reused in case of lag
self.new_frame[index] = False
# Get image in HSV (hue, saturation, value) colour format
im_hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV)
# Specify target ball colour
rgb_colour = np.uint8([[[255, 0, 0]]]) # e.g. Blue (Note: BGR)
# Convert this colour to HSV colour model
hsv_colour = cv2.cvtColor(rgb_colour, cv2.COLOR_RGB2HSV)
# Extract colour boundaries for masking image
# Get the hue value from the numpy array containing target colour
target_hue = hsv_colour[0, 0][0]
hsv_lo_end = np.array([target_hue - 20, 70, 70])
hsv_hi_end = np.array([target_hue + 20, 255, 255])
# Generate the mask based on the desired hue range
mask = cv2.inRange(im_hsv, hsv_lo_end, hsv_hi_end)
mask_on_image = cv2.bitwise_and(im_hsv, im_hsv, mask=mask)
# Debug window to show the mask
if self.DEBUG:
cv2.imshow("mask" + str(index), mask_on_image)
cv2.waitKey(1)
# Clean up the image
seg = mask
seg = cv2.GaussianBlur(seg, (5, 5), 0)
seg = cv2.erode(seg, None, iterations=2)
seg = cv2.dilate(seg, None, iterations=2)
# Fine-tune parameters
ball_detect_min_dist_between_cens = 40 # Empirical
canny_high_thresh = 10 # Empirical
ball_detect_sensitivity = 10 # Lower detects more circles, so it's a trade-off
ball_detect_min_radius = 5 # Arbitrary, empirical
ball_detect_max_radius = 50 # Arbitrary, empirical
# Find circles using OpenCV routine
# This function returns a list of circles, with their x, y and r values
circles = cv2.HoughCircles(
seg,
cv2.HOUGH_GRADIENT,
1,
ball_detect_min_dist_between_cens,
param1=canny_high_thresh,
param2=ball_detect_sensitivity,
minRadius=ball_detect_min_radius,
maxRadius=ball_detect_max_radius,
)
if circles is None:
# If no circles were found, just display the original image
return
# Get the largest circle
max_circle = None
self.max_rad = 0
circles = np.uint16(np.around(circles))
for c in circles[0, :]:
if c[2] > self.max_rad:
self.max_rad = c[2]
max_circle = c
# This shouldn't happen, but you never know...
if max_circle is None:
return
# Append detected circle and its centre to the frame
cv2.circle(frame, (max_circle[0], max_circle[1]), max_circle[2], (0, 255, 0), 2)
cv2.circle(frame, (max_circle[0], max_circle[1]), 2, (0, 0, 255), 3)
if self.DEBUG:
cv2.imshow("circles" + str(index), frame)
cv2.waitKey(1)
# Normalise values to: x,y = [-0.5, 0.5], r = [0, 1]
max_circle = np.array(max_circle).astype("float32")
max_circle[0] -= self.x_centre
max_circle[0] /= self.frame_width
max_circle[1] -= self.y_centre
max_circle[1] /= self.frame_width
max_circle[1] *= -1.0
max_circle[2] /= self.frame_width
# Return a list values [x, y, r] for the largest circle
return [max_circle[0], max_circle[1], max_circle[2]]
Hopefully, this worked example helped you get to grips with the structure and flow of a typical MiRo program.
Now would be a good time to reinforce what you've learned so far with some practice.
Using the code in the kick_blue_ball.py
script as a template, try making the following the changes/improvements to MiRo's behaviour in that scenario:
- As it is, MiRo always only turns right if it doesn't see a ball. Help MiRo memorise on which side the ball was last seen and turn in that direction.
- Sometimes just turning on the spot is not good enough. Add some short movements in a random direction to MiRo's search behaviour, in hopes of getting a better vantage point.
Don't forget to add cliff sensor checks in case MiRo is on an elevated surface!
- Try merging the turning and moving forward into one single movement. This might be especially useful in the enclosed MiRo Arena, as the ball can keep moving for a quite a long time after a good, clean kick.
You can edit the provided script or create new Python scripts in the same folder.
These can then also be run using the same rosrun
method.
=====
This completes the tutorial for the Week 1 lab session.
Remember to save your files and settings with wsl_ros backup
once you're done!
Navigating This Wiki:
← Getting Started |
Week 2: Working with the Real MiRos →
COM3528 Cognitive and Biomimetic Robotics
Academic Year 2021–2022
Alex Lucas | Department of Computer Science | The University of Sheffield
(Last updated: 24/03/2022)
The documentation within this Wiki is licensed under Creative Commons License CC BY-NC:
You are free to distribute, remix, adapt, and build upon this work (for non-commercial purposes only) as long as credit is given to the original author.