Chapter 2: ROS2 Communication Patterns

Overview

ROS2 provides multiple communication patterns for different use cases. This chapter covers all communication mechanisms in depth.

Communication Types

graph LR
    A[Topics] -->|Pub-Sub| B[Continuous Data]
    C[Services] -->|Request-Reply| D[One-time Tasks]
    E[Actions] -->|Goal-Feedback-Result| F[Long Tasks]
    G[Parameters] -->|Configuration| H[Node Settings]
    
    style A fill:#C9B59C
    style C fill:#C9B59C
    style E fill:#C9B59C
    style G fill:#C9B59C

Topics (Publish-Subscribe)

When to Use Topics

  • Continuous data streams (sensor readings, robot state)
  • One-to-many communication
  • Asynchronous data flow

Quality of Service (QoS)

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Sensor data QoS (best effort, keep last)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)

# Command QoS (reliable, keep all)
command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_ALL,
    depth=100
)

# Create publisher with QoS
self.publisher = self.create_publisher(
    LaserScan,
    'scan',
    sensor_qos
)

Custom Messages

Create msg/RobotState.msg:

# Robot state message
string robot_id
float64 battery_level
float64 x_position
float64 y_position
float64 orientation
string[] active_tasks

Use in code:

from my_robot_interfaces.msg import RobotState

class StatePublisher(Node):
    def __init__(self):
        super().__init__('state_publisher')
        self.publisher = self.create_publisher(RobotState, 'robot_state', 10)
        
    def publish_state(self):
        msg = RobotState()
        msg.robot_id = 'robot_01'
        msg.battery_level = 85.5
        msg.x_position = 1.5
        msg.y_position = 2.3
        msg.orientation = 0.785  # radians
        msg.active_tasks = ['navigate', 'avoid_obstacles']
        
        self.publisher.publish(msg)

Services (Request-Response)

When to Use Services

  • One-time requests (take photo, calculate path)
  • Synchronous operations
  • Guaranteed response

Service Example

Create srv/CalculatePath.srv:

# Request
float64 start_x
float64 start_y
float64 goal_x
float64 goal_y
---
# Response
float64[] path_x
float64[] path_y
float64 path_length
bool success
string message

Service Server:

from my_robot_interfaces.srv import CalculatePath

class PathPlannerService(Node):
    def __init__(self):
        super().__init__('path_planner')
        self.srv = self.create_service(
            CalculatePath,
            'calculate_path',
            self.calculate_path_callback
        )
        
    def calculate_path_callback(self, request, response):
        # Simple straight-line path
        response.path_x = [request.start_x, request.goal_x]
        response.path_y = [request.start_y, request.goal_y]
        
        # Calculate distance
        dx = request.goal_x - request.start_x
        dy = request.goal_y - request.start_y
        response.path_length = (dx**2 + dy**2)**0.5
        
        response.success = True
        response.message = f'Path calculated: {response.path_length:.2f}m'
        
        self.get_logger().info(response.message)
        return response

Service Client:

from my_robot_interfaces.srv import CalculatePath

class NavigationClient(Node):
    def __init__(self):
        super().__init__('navigation_client')
        self.client = self.create_client(CalculatePath, 'calculate_path')
        
        # Wait for service
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for path planner service...')
    
    def send_goal(self, start_x, start_y, goal_x, goal_y):
        request = CalculatePath.Request()
        request.start_x = start_x
        request.start_y = start_y
        request.goal_x = goal_x
        request.goal_y = goal_y
        
        future = self.client.call_async(request)
        future.add_done_callback(self.response_callback)
    
    def response_callback(self, future):
        try:
            response = future.result()
            if response.success:
                self.get_logger().info(f'Path: {response.message}')
            else:
                self.get_logger().error('Path planning failed')
        except Exception as e:
            self.get_logger().error(f'Service call failed: {e}')

Actions (Goal-Feedback-Result)

When to Use Actions

  • Long-running tasks (navigation, manipulation)
  • Progress feedback needed
  • Cancelable operations

Action Example

Create action/Navigate.action:

# Goal
float64 target_x
float64 target_y
---
# Result
bool success
float64 final_x
float64 final_y
float64 distance_traveled
---
# Feedback
float64 current_x
float64 current_y
float64 distance_remaining

Action Server:

import time
from rclpy.action import ActionServer
from my_robot_interfaces.action import Navigate

class NavigationActionServer(Node):
    def __init__(self):
        super().__init__('navigation_action_server')
        self._action_server = ActionServer(
            self,
            Navigate,
            'navigate',
            self.execute_callback
        )
    
    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing navigation...')
        
        feedback_msg = Navigate.Feedback()
        result = Navigate.Result()
        
        # Simulate navigation
        current_x, current_y = 0.0, 0.0
        target_x = goal_handle.request.target_x
        target_y = goal_handle.request.target_y
        
        steps = 10
        for i in range(steps):
            # Check if canceled
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                return Navigate.Result()
            
            # Update position
            current_x += (target_x - current_x) / (steps - i)
            current_y += (target_y - current_y) / (steps - i)
            
            # Publish feedback
            feedback_msg.current_x = current_x
            feedback_msg.current_y = current_y
            dx = target_x - current_x
            dy = target_y - current_y
            feedback_msg.distance_remaining = (dx**2 + dy**2)**0.5
            
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(0.5)
        
        # Set result
        goal_handle.succeed()
        result.success = True
        result.final_x = current_x
        result.final_y = current_y
        result.distance_traveled = (target_x**2 + target_y**2)**0.5
        
        return result

Action Client:

from rclpy.action import ActionClient
from my_robot_interfaces.action import Navigate

class NavigationActionClient(Node):
    def __init__(self):
        super().__init__('navigation_action_client')
        self._action_client = ActionClient(self, Navigate, 'navigate')
    
    def send_goal(self, x, y):
        goal_msg = Navigate.Goal()
        goal_msg.target_x = x
        goal_msg.target_y = y
        
        self._action_client.wait_for_server()
        
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        
        self._send_goal_future.add_done_callback(self.goal_response_callback)
    
    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(
            f'Position: ({feedback.current_x:.2f}, {feedback.current_y:.2f}), '
            f'Remaining: {feedback.distance_remaining:.2f}m'
        )
    
    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return
        
        self.get_logger().info('Goal accepted')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
    
    def get_result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info(
                f'Navigation complete! Traveled {result.distance_traveled:.2f}m'
            )

Parameters

Dynamic Configuration

from rcl_interfaces.msg import ParameterDescriptor

class ConfigurableNode(Node):
    def __init__(self):
        super().__init__('configurable_node')
        
        # Declare parameters with descriptors
        self.declare_parameter(
            'max_speed',
            1.0,
            ParameterDescriptor(description='Maximum robot speed (m/s)')
        )
        
        self.declare_parameter(
            'robot_name',
            'robot_01',
            ParameterDescriptor(description='Robot identifier')
        )
        
        # Get parameters
        self.max_speed = self.get_parameter('max_speed').value
        self.robot_name = self.get_parameter('robot_name').value
        
        # Add parameter callback
        self.add_on_set_parameters_callback(self.parameter_callback)
    
    def parameter_callback(self, params):
        for param in params:
            if param.name == 'max_speed':
                if 0.0 < param.value <= 5.0:
                    self.max_speed = param.value
                    self.get_logger().info(f'Max speed updated: {param.value}')
                else:
                    return SetParametersResult(successful=False)
        
        return SetParametersResult(successful=True)

Set parameters from command line:

ros2 param set /configurable_node max_speed 2.5
ros2 param get /configurable_node max_speed
ros2 param list /configurable_node

Key Takeaways

  1. āœ… Topics for continuous data streams
  2. āœ… Services for request-response
  3. āœ… Actions for long-running tasks with feedback
  4. āœ… Parameters for dynamic configuration
  5. āœ… QoS controls reliability and performance
  6. āœ… Custom messages for domain-specific data

Next Chapter

Chapter 3: Robot Simulation with Gazebo and URDF!