Lab 2: Services and Custom Messages
Objective
Learn to create ROS2 services and define custom message types for robot control.
Learning Outcomes
- ✅ Create custom message definitions
- ✅ Implement service servers
- ✅ Build service clients
- ✅ Handle request-response patterns
Estimated Time
⏱️ 3 hours
Part 1: Create Custom Messages (45 min)
Create Interface Package
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake robot_interfaces
cd robot_interfaces
mkdir msg srv
Define Custom Message
Create msg/BatteryStatus.msg:
float32 voltage
float32 current
float32 percentage
bool is_charging
string status
Define Service
Create srv/SetSpeed.srv:
# Request
float32 linear_speed
float32 angular_speed
---
# Response
bool success
string message
Configure CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/BatteryStatus.msg"
"srv/SetSpeed.srv"
)
Build
cd ~/ros2_ws
colcon build --packages-select robot_interfaces
source install/setup.bash
Part 2: Service Server (60 min)
Create robot_control/speed_service.py:
import rclpy
from rclpy.node import Node
from robot_interfaces.srv import SetSpeed
class SpeedService(Node):
def __init__(self):
super().__init__('speed_service')
self.srv = self.create_service(
SetSpeed,
'set_speed',
self.set_speed_callback
)
self.current_linear = 0.0
self.current_angular = 0.0
self.get_logger().info('Speed service ready')
def set_speed_callback(self, request, response):
# Validate speeds
if abs(request.linear_speed) > 2.0:
response.success = False
response.message = 'Linear speed too high (max 2.0 m/s)'
return response
if abs(request.angular_speed) > 3.0:
response.success = False
response.message = 'Angular speed too high (max 3.0 rad/s)'
return response
# Set speeds
self.current_linear = request.linear_speed
self.current_angular = request.angular_speed
response.success = True
response.message = f'Speed set: linear={request.linear_speed}, angular={request.angular_speed}'
self.get_logger().info(response.message)
return response
Part 3: Service Client (60 min)
Create robot_control/speed_client.py:
import rclpy
from rclpy.node import Node
from robot_interfaces.srv import SetSpeed
class SpeedClient(Node):
def __init__(self):
super().__init__('speed_client')
self.client = self.create_client(SetSpeed, 'set_speed')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for service...')
def send_request(self, linear, angular):
request = SetSpeed.Request()
request.linear_speed = linear
request.angular_speed = angular
future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)
return future.result()
def main():
rclpy.init()
client = SpeedClient()
# Test different speeds
result = client.send_request(1.0, 0.5)
print(f'Result: {result.success}, {result.message}')
result = client.send_request(5.0, 0.0) # Should fail
print(f'Result: {result.success}, {result.message}')
client.destroy_node()
rclpy.shutdown()
Deliverables
- Custom message and service definitions
- Working service server
- Working service client
- Test results showing validation
Time: 3 hours