Chapter 2: ROS2 Communication Patterns
Overview
ROS2 provides multiple communication patterns for different use cases. This chapter covers all communication mechanisms in depth.
Communication Types
graph LR
A[Topics] -->|Pub-Sub| B[Continuous Data]
C[Services] -->|Request-Reply| D[One-time Tasks]
E[Actions] -->|Goal-Feedback-Result| F[Long Tasks]
G[Parameters] -->|Configuration| H[Node Settings]
style A fill:#C9B59C
style C fill:#C9B59C
style E fill:#C9B59C
style G fill:#C9B59C
Topics (Publish-Subscribe)
When to Use Topics
- Continuous data streams (sensor readings, robot state)
- One-to-many communication
- Asynchronous data flow
Quality of Service (QoS)
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# Sensor data QoS (best effort, keep last)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Command QoS (reliable, keep all)
command_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_ALL,
depth=100
)
# Create publisher with QoS
self.publisher = self.create_publisher(
LaserScan,
'scan',
sensor_qos
)
Custom Messages
Create msg/RobotState.msg:
# Robot state message
string robot_id
float64 battery_level
float64 x_position
float64 y_position
float64 orientation
string[] active_tasks
Use in code:
from my_robot_interfaces.msg import RobotState
class StatePublisher(Node):
def __init__(self):
super().__init__('state_publisher')
self.publisher = self.create_publisher(RobotState, 'robot_state', 10)
def publish_state(self):
msg = RobotState()
msg.robot_id = 'robot_01'
msg.battery_level = 85.5
msg.x_position = 1.5
msg.y_position = 2.3
msg.orientation = 0.785 # radians
msg.active_tasks = ['navigate', 'avoid_obstacles']
self.publisher.publish(msg)
Services (Request-Response)
When to Use Services
- One-time requests (take photo, calculate path)
- Synchronous operations
- Guaranteed response
Service Example
Create srv/CalculatePath.srv:
# Request
float64 start_x
float64 start_y
float64 goal_x
float64 goal_y
---
# Response
float64[] path_x
float64[] path_y
float64 path_length
bool success
string message
Service Server:
from my_robot_interfaces.srv import CalculatePath
class PathPlannerService(Node):
def __init__(self):
super().__init__('path_planner')
self.srv = self.create_service(
CalculatePath,
'calculate_path',
self.calculate_path_callback
)
def calculate_path_callback(self, request, response):
# Simple straight-line path
response.path_x = [request.start_x, request.goal_x]
response.path_y = [request.start_y, request.goal_y]
# Calculate distance
dx = request.goal_x - request.start_x
dy = request.goal_y - request.start_y
response.path_length = (dx**2 + dy**2)**0.5
response.success = True
response.message = f'Path calculated: {response.path_length:.2f}m'
self.get_logger().info(response.message)
return response
Service Client:
from my_robot_interfaces.srv import CalculatePath
class NavigationClient(Node):
def __init__(self):
super().__init__('navigation_client')
self.client = self.create_client(CalculatePath, 'calculate_path')
# Wait for service
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for path planner service...')
def send_goal(self, start_x, start_y, goal_x, goal_y):
request = CalculatePath.Request()
request.start_x = start_x
request.start_y = start_y
request.goal_x = goal_x
request.goal_y = goal_y
future = self.client.call_async(request)
future.add_done_callback(self.response_callback)
def response_callback(self, future):
try:
response = future.result()
if response.success:
self.get_logger().info(f'Path: {response.message}')
else:
self.get_logger().error('Path planning failed')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
Actions (Goal-Feedback-Result)
When to Use Actions
- Long-running tasks (navigation, manipulation)
- Progress feedback needed
- Cancelable operations
Action Example
Create action/Navigate.action:
# Goal
float64 target_x
float64 target_y
---
# Result
bool success
float64 final_x
float64 final_y
float64 distance_traveled
---
# Feedback
float64 current_x
float64 current_y
float64 distance_remaining
Action Server:
import time
from rclpy.action import ActionServer
from my_robot_interfaces.action import Navigate
class NavigationActionServer(Node):
def __init__(self):
super().__init__('navigation_action_server')
self._action_server = ActionServer(
self,
Navigate,
'navigate',
self.execute_callback
)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing navigation...')
feedback_msg = Navigate.Feedback()
result = Navigate.Result()
# Simulate navigation
current_x, current_y = 0.0, 0.0
target_x = goal_handle.request.target_x
target_y = goal_handle.request.target_y
steps = 10
for i in range(steps):
# Check if canceled
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Navigate.Result()
# Update position
current_x += (target_x - current_x) / (steps - i)
current_y += (target_y - current_y) / (steps - i)
# Publish feedback
feedback_msg.current_x = current_x
feedback_msg.current_y = current_y
dx = target_x - current_x
dy = target_y - current_y
feedback_msg.distance_remaining = (dx**2 + dy**2)**0.5
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.5)
# Set result
goal_handle.succeed()
result.success = True
result.final_x = current_x
result.final_y = current_y
result.distance_traveled = (target_x**2 + target_y**2)**0.5
return result
Action Client:
from rclpy.action import ActionClient
from my_robot_interfaces.action import Navigate
class NavigationActionClient(Node):
def __init__(self):
super().__init__('navigation_action_client')
self._action_client = ActionClient(self, Navigate, 'navigate')
def send_goal(self, x, y):
goal_msg = Navigate.Goal()
goal_msg.target_x = x
goal_msg.target_y = y
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'Position: ({feedback.current_x:.2f}, {feedback.current_y:.2f}), '
f'Remaining: {feedback.distance_remaining:.2f}m'
)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(
f'Navigation complete! Traveled {result.distance_traveled:.2f}m'
)
Parameters
Dynamic Configuration
from rcl_interfaces.msg import ParameterDescriptor
class ConfigurableNode(Node):
def __init__(self):
super().__init__('configurable_node')
# Declare parameters with descriptors
self.declare_parameter(
'max_speed',
1.0,
ParameterDescriptor(description='Maximum robot speed (m/s)')
)
self.declare_parameter(
'robot_name',
'robot_01',
ParameterDescriptor(description='Robot identifier')
)
# Get parameters
self.max_speed = self.get_parameter('max_speed').value
self.robot_name = self.get_parameter('robot_name').value
# Add parameter callback
self.add_on_set_parameters_callback(self.parameter_callback)
def parameter_callback(self, params):
for param in params:
if param.name == 'max_speed':
if 0.0 < param.value <= 5.0:
self.max_speed = param.value
self.get_logger().info(f'Max speed updated: {param.value}')
else:
return SetParametersResult(successful=False)
return SetParametersResult(successful=True)
Set parameters from command line:
ros2 param set /configurable_node max_speed 2.5
ros2 param get /configurable_node max_speed
ros2 param list /configurable_node
Key Takeaways
- ā Topics for continuous data streams
- ā Services for request-response
- ā Actions for long-running tasks with feedback
- ā Parameters for dynamic configuration
- ā QoS controls reliability and performance
- ā Custom messages for domain-specific data
Next Chapter
Chapter 3: Robot Simulation with Gazebo and URDF!