Skip to content

Instantly share code, notes, and snippets.

@anasAlsalol
Last active January 26, 2026 09:31
Show Gist options
  • Select an option

  • Save anasAlsalol/02bcca64452fade5bb48f62cad5f1e85 to your computer and use it in GitHub Desktop.

Select an option

Save anasAlsalol/02bcca64452fade5bb48f62cad5f1e85 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
TurtleBot3 Obstacle Avoidance Node
Subscribes to /scan (LiDAR) and publishes to /cmd_vel
Implements: Forward movement, obstacle detection, collision avoidance
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
class ObstacleAvoider(Node):
def __init__(self):
super().__init__('obstacle_avoider')
# Publisher for velocity commands
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
# Subscriber to LiDAR data
self.subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)
# Control parameters
self.linear_speed = 0.15 # m/s - forward speed
self.angular_speed = 0.5 # rad/s - turning speed
self.min_safe_distance = 0.5 # meters - stop distance
self.turn_threshold = 0.8 # meters - start turning distance
# State variables
self.obstacle_detected = False
self.turn_direction = 0 # -1: left, 1: right
self.last_turn_time = self.get_clock().now()
# Scan data storage
self.scan_ranges = []
self.scan_available = False
# Timer for periodic control
self.timer = self.create_timer(0.1, self.control_loop) # 10 Hz
self.get_logger().info('🎯 Obstacle Avoider Node Started')
self.get_logger().info(f'Safe distance: {self.min_safe_distance}m')
self.get_logger().info(f'Forward speed: {self.linear_speed}m/s')
self.get_logger().info('Ready to navigate!')
def scan_callback(self, msg):
"""Process incoming LiDAR data"""
self.scan_ranges = list(msg.ranges)
self.scan_available = True
# Replace infinite values with max range
for i in range(len(self.scan_ranges)):
if np.isinf(self.scan_ranges[i]):
self.scan_ranges[i] = msg.range_max
def check_obstacles(self):
"""Check for obstacles in front of robot"""
if not self.scan_available or len(self.scan_ranges) == 0:
return False, 0
# Define front sector (45 degrees centered on front)
num_readings = len(self.scan_ranges)
front_start = int(num_readings * 0.4375) # 315 degrees
front_end = int(num_readings * 0.5625) # 45 degrees
# Extract front readings
front_readings = self.scan_ranges[front_start:front_end]
# Check for obstacles
min_distance = min(front_readings) if front_readings else float('inf')
if min_distance < self.min_safe_distance:
self.get_logger().warning(f'🚨 Obstacle detected at {min_distance:.2f}m')
return True, min_distance
elif min_distance < self.turn_threshold:
self.get_logger().info(f'⚠️ Close obstacle at {min_distance:.2f}m')
return False, min_distance
return False, min_distance
def choose_turn_direction(self):
"""Choose best direction to turn based on side obstacles"""
if not self.scan_available:
return 1 # Default to right turn
num_readings = len(self.scan_ranges)
# Left side (90-135 degrees)
left_start = int(num_readings * 0.25) # 90 degrees
left_end = int(num_readings * 0.375) # 135 degrees
# Right side (225-270 degrees)
right_start = int(num_readings * 0.625) # 225 degrees
right_end = int(num_readings * 0.75) # 270 degrees
left_readings = self.scan_ranges[left_start:left_end]
right_readings = self.scan_ranges[right_start:right_end]
# Calculate average distances
left_avg = np.mean(left_readings) if left_readings else float('inf')
right_avg = np.mean(right_readings) if right_readings else float('inf')
# Choose direction with more space
if left_avg > right_avg:
self.get_logger().info('🔄 Turning LEFT (more space)')
return -1 # Left turn
else:
self.get_logger().info('🔄 Turning RIGHT (more space)')
return 1 # Right turn
def control_loop(self):
"""Main control loop - called by timer"""
if not self.scan_available:
return
# Create Twist message
twist_msg = Twist()
# Check for obstacles
obstacle_detected, distance = self.check_obstacles()
if obstacle_detected:
# STOP - obstacle too close
twist_msg.linear.x = 0.0
twist_msg.angular.z = 0.0
# Choose turn direction
self.turn_direction = self.choose_turn_direction()
self.last_turn_time = self.get_clock().now()
self.get_logger().warning(f'🛑 STOPPED - Obstacle at {distance:.2f}m')
elif distance < self.turn_threshold:
# SLOW DOWN and prepare to turn
twist_msg.linear.x = self.linear_speed * 0.5 # Reduced speed
twist_msg.angular.z = 0.0
self.get_logger().info(f'🐢 SLOWING - Obstacle at {distance:.2f}m')
else:
# NORMAL FORWARD MOVEMENT
twist_msg.linear.x = self.linear_speed
twist_msg.angular.z = 0.0
# Log periodically
current_time = self.get_clock().now()
if (current_time - self.last_turn_time).nanoseconds > 3e9: # Every 3 seconds
self.get_logger().info(f'🚀 Moving FORWARD - Clear path ahead')
self.last_turn_time = current_time
# Publish velocity command
self.publisher_.publish(twist_msg)
def emergency_stop(self):
"""Emergency stop function"""
twist_msg = Twist()
twist_msg.linear.x = 0.0
twist_msg.angular.z = 0.0
self.publisher_.publish(twist_msg)
self.get_logger().critical('🛑 EMERGENCY STOP ACTIVATED')
def main(args=None):
rclpy.init(args=args)
try:
node = ObstacleAvoider()
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down obstacle avoider...')
node.emergency_stop()
except Exception as e:
node.get_logger().error(f'Error: {e}')
if 'node' in locals():
node.emergency_stop()
finally:
if 'node' in locals():
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/bin/bash
# FIXED Task 4 Test Script - Proper movement detection
echo "=== TASK 4: OBSTACLE AVOIDANCE TEST ==="
echo "Duration: 2 minutes"
# Create directory
TEST_DIR="$HOME/task4_test_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$TEST_DIR"
cd "$TEST_DIR"
echo "Test directory: $TEST_DIR"
# Start bag recording
echo "Starting bag recording..."
ros2 bag record -o task4_bag -a &
BAG_PID=$!
# Start individual topic recording (RAW format for better parsing)
echo "Starting topic recording..."
ros2 topic echo /scan > scan.txt &
SCAN_PID=$!
ros2 topic echo /cmd_vel > cmd_vel.txt &
CMD_PID=$!
ros2 topic echo /odom > odom.txt &
ODOM_PID=$!
# Record system info
echo "Recording system information..."
date > test_info.txt
ros2 --version >> test_info.txt
echo "TURTLEBOT3_MODEL=$TURTLEBOT3_MODEL" >> test_info.txt
ros2 node list > nodes.txt
ros2 topic list > topics.txt
echo "Test starting in 5 seconds..."
sleep 5
# Run the node (in background to capture output)
echo "Starting obstacle avoidance node..."
ros2 run tb3_motion_control simple_avoidance 2>&1 | tee node_output.txt &
NODE_PID=$!
echo "Test running... (120 seconds)"
for i in {1..120}; do
if (( i % 30 == 0 )); then
echo "Elapsed: ${i}s / 120s"
fi
sleep 1
done
echo "Test complete!"
echo "Stopping recordings..."
# Stop everything
kill $NODE_PID 2>/dev/null
sleep 2
kill $BAG_PID $SCAN_PID $CMD_PID $ODOM_PID 2>/dev/null
sleep 2
# Generate summary with PROPER movement detection
echo "Generating summary..."
echo "=== TEST SUMMARY ===" > summary.txt
echo "Directory: $TEST_DIR" >> summary.txt
echo "Duration: 120 seconds" >> summary.txt
echo "" >> summary.txt
# Check cmd_vel data PROPERLY
if [ -s cmd_vel.txt ]; then
CMD_COUNT=$(wc -l < cmd_vel.txt)
echo "Commands recorded: $CMD_COUNT" >> summary.txt
# FIXED: Proper movement detection
# Count non-zero linear.x commands
NON_ZERO_COUNT=$(grep -c '"x": 0\.[0-9]' cmd_vel.txt 2>/dev/null || echo "0")
ZERO_COUNT=$(grep -c '"x": 0\.0' cmd_vel.txt 2>/dev/null || echo "0")
echo "" >> summary.txt
echo "=== MOVEMENT ANALYSIS ===" >> summary.txt
echo "Total velocity commands: $CMD_COUNT" >> summary.txt
echo "Movement commands (non-zero): $NON_ZERO_COUNT" >> summary.txt
echo "Stop commands (zero): $ZERO_COUNT" >> summary.txt
if [ "$NON_ZERO_COUNT" -gt 0 ]; then
echo "✅ MOVEMENT DETECTED: Robot received $NON_ZERO_COUNT movement commands" >> summary.txt
# Extract sample movement values
echo "" >> summary.txt
echo "=== SAMPLE MOVEMENT COMMANDS ===" >> summary.txt
grep '"x": 0\.[0-9]' cmd_vel.txt | head -5 | while read -r line; do
echo " $line" >> summary.txt
done
# Check for turns (non-zero angular.z)
TURN_COUNT=$(grep -c '"z": -0\.[0-9]\|"z": 0\.[0-9]' cmd_vel.txt 2>/dev/null || echo "0")
echo "" >> summary.txt
echo "Turn commands detected: $TURN_COUNT" >> summary.txt
else
echo "⚠️ NO MOVEMENT DETECTED: All commands have linear.x = 0.0" >> summary.txt
fi
else
echo "❌ ERROR: No cmd_vel data recorded" >> summary.txt
fi
# Check node output
echo "" >> summary.txt
echo "=== NODE OUTPUT SUMMARY ===" >> summary.txt
if [ -s node_output.txt ]; then
echo "Last 20 lines of node output:" >> summary.txt
tail -20 node_output.txt >> summary.txt
# Check for obstacle detections
OBSTACLE_COUNT=$(grep -c -i "obstacle\|too close\|emergency\|turning" node_output.txt 2>/dev/null || echo "0")
echo "" >> summary.txt
echo "Obstacle detections in logs: $OBSTACLE_COUNT" >> summary.txt
else
echo "No node output recorded" >> summary.txt
fi
# Check scan data
echo "" >> summary.txt
echo "=== SENSOR DATA ===" >> summary.txt
if [ -s scan.txt ]; then
SCAN_COUNT=$(wc -l < scan.txt)
echo "Scan messages: $SCAN_COUNT" >> summary.txt
else
echo "No scan data" >> summary.txt
fi
if [ -s odom.txt ]; then
ODOM_COUNT=$(wc -l < odom.txt)
echo "Odometry messages: $ODOM_COUNT" >> summary.txt
else
echo "No odometry data" >> summary.txt
fi
# List all files
echo "" >> summary.txt
echo "=== FILES CREATED ===" >> summary.txt
ls -la >> summary.txt
# Create a simple movement visualization
if [ -s cmd_vel.txt ]; then
echo "" >> summary.txt
echo "=== MOVEMENT VISUALIZATION ===" >> summary.txt
# Extract linear.x values (simplified)
grep -o '"x": [0-9.-]*' cmd_vel.txt | cut -d' ' -f2 > linear_values.txt
# Create ASCII chart
echo "Linear X velocity (sampled every 10th command):" >> summary.txt
awk 'NR % 10 == 0 {print $0}' linear_values.txt | head -30 | while read val; do
# Scale for display
bars=$(printf "%0.s█" $(seq 1 $(echo "scale=0; (($val)*50)+1" | bc 2>/dev/null || echo 1)))
printf " %6.3f | %s\n" "$val" "$bars" >> summary.txt
done
fi
echo ""
echo "=== TEST COMPLETE ==="
echo "Results saved to: $TEST_DIR"
echo "Check summary.txt for detailed analysis"
echo ""
echo "📸 SCREENSHOTS REQUIRED:"
echo "1. Terminal showing 'MOVEMENT DETECTED'"
echo "2. Gazebo window showing robot movement"
echo "3. summary.txt content"
#!/usr/bin/env python3
"""
Simple Obstacle Avoidance Algorithm for TurtleBot3
Task 3: Basic logic with distance thresholds
Rule: If min distance in front < 0.4m → stop + turn, Else → move forward
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
class SimpleObstacleAvoidance(Node):
def __init__(self):
super().__init__('simple_obstacle_avoidance')
# Create publisher for velocity commands
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# Subscribe to LiDAR data
self.scan_sub = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)
# Control parameters - ADJUST THESE BASED ON YOUR DATA
self.forward_speed = 0.15 # m/s - normal forward speed
self.turn_speed = 0.5 # rad/s - turning speed
self.stop_distance = 0.4 # meters - stop and turn threshold (Task requirement)
self.slow_distance = 0.6 # meters - slow down threshold
# Define front sector angles (in degrees)
self.front_angle = 30 # ±15 degrees from center
self.side_angle = 60 # For checking left/right clearance
# State tracking
self.state = "FORWARD"
self.obstacle_distance = 0.0
self.turn_direction = 1 # 1 = right, -1 = left
# For logging
self.log_counter = 0
self.get_logger().info('='*50)
self.get_logger().info('SIMPLE OBSTACLE AVOIDANCE ALGORITHM')
self.get_logger().info('='*50)
self.get_logger().info(f'Stop threshold: {self.stop_distance}m')
self.get_logger().info(f'Slow threshold: {self.slow_distance}m')
self.get_logger().info(f'Front angle: ±{self.front_angle/2}°')
self.get_logger().info('Algorithm:')
self.get_logger().info(' IF distance < 0.4m → STOP + TURN')
self.get_logger().info(' ELSE → MOVE FORWARD')
self.get_logger().info('='*50)
def get_front_distances(self, ranges, angle_min, angle_increment):
"""Extract distances in front of robot"""
num_points = len(ranges)
# Calculate indices for front sector
center_index = num_points // 2
half_sector = int((self.front_angle * np.pi / 180) / angle_increment / 2)
start_idx = center_index - half_sector
end_idx = center_index + half_sector
# Ensure indices are within bounds
start_idx = max(0, start_idx)
end_idx = min(num_points, end_idx)
# Get front ranges
front_ranges = ranges[start_idx:end_idx]
# Filter out invalid values (0, inf, nan)
valid_ranges = []
for d in front_ranges:
if not (np.isnan(d) or np.isinf(d) or d == 0):
valid_ranges.append(d)
return valid_ranges
def check_clear_direction(self, ranges, angle_min, angle_increment, start_deg, end_deg):
"""Check if direction has enough clearance"""
num_points = len(ranges)
# Convert degrees to indices
start_idx = int((start_deg * np.pi / 180 - angle_min) / angle_increment)
end_idx = int((end_deg * np.pi / 180 - angle_min) / angle_increment)
# Ensure indices are within bounds
start_idx = max(0, start_idx)
end_idx = min(num_points, end_idx)
if start_idx >= end_idx:
return float('inf')
# Get ranges for this direction
dir_ranges = ranges[start_idx:end_idx]
# Find minimum valid distance
min_dist = float('inf')
for d in dir_ranges:
if not (np.isnan(d) or np.isinf(d) or d == 0):
if d < min_dist:
min_dist = d
return min_dist
def choose_turn_direction(self, ranges, angle_min, angle_increment):
"""Choose which direction to turn based on side clearance"""
# Check left side (45-135 degrees)
left_min = self.check_clear_direction(ranges, angle_min, angle_increment, 45, 135)
# Check right side (225-315 degrees)
right_min = self.check_clear_direction(ranges, angle_min, angle_increment, 225, 315)
# Only log occasionally to avoid spam
if self.log_counter % 50 == 0:
self.get_logger().info(f'Left clearance: {left_min:.2f}m, Right clearance: {right_min:.2f}m')
# Choose direction with more space
if left_min > right_min:
if self.log_counter % 50 == 0:
self.get_logger().info('Turning LEFT (more space)')
return -1 # Negative for left turn
else:
if self.log_counter % 50 == 0:
self.get_logger().info('Turning RIGHT (more space)')
return 1 # Positive for right turn
def scan_callback(self, msg):
"""Process LiDAR data and control robot"""
try:
# Create velocity command
cmd_vel = Twist()
# Get front distances
front_distances = self.get_front_distances(msg.ranges, msg.angle_min, msg.angle_increment)
if not front_distances:
# No valid front readings - stop and wait
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.0
self.state = "STOP_NO_DATA"
if self.log_counter % 100 == 0:
self.get_logger().warn('No valid front LiDAR data')
else:
# Find minimum front distance
min_front = min(front_distances)
self.obstacle_distance = min_front
# TASK 3 ALGORITHM IMPLEMENTATION
if min_front < self.stop_distance:
# OBSTACLE TOO CLOSE: STOP + TURN
cmd_vel.linear.x = 0.0
# Choose turn direction
self.turn_direction = self.choose_turn_direction(
msg.ranges, msg.angle_min, msg.angle_increment)
cmd_vel.angular.z = self.turn_direction * self.turn_speed
self.state = "STOP_TURN"
# Log every 30th message to avoid spam
if self.log_counter % 30 == 0:
self.get_logger().warning(f'🚨 OBSTACLE TOO CLOSE: {min_front:.2f}m < {self.stop_distance}m')
self.get_logger().info(f'Action: STOP + TURN {"LEFT" if self.turn_direction < 0 else "RIGHT"}')
elif min_front < self.slow_distance:
# OBSTACLE NEARBY: SLOW DOWN
cmd_vel.linear.x = self.forward_speed * 0.5 # Half speed
cmd_vel.angular.z = 0.0
self.state = "SLOW"
if self.log_counter % 60 == 0:
self.get_logger().info(f'⚠️ Close obstacle: {min_front:.2f}m')
self.get_logger().info('Action: SLOW DOWN')
else:
# CLEAR PATH: MOVE FORWARD
cmd_vel.linear.x = self.forward_speed
cmd_vel.angular.z = 0.0
self.state = "FORWARD"
if self.log_counter % 90 == 0:
self.get_logger().info(f'✅ Clear path: {min_front:.2f}m')
self.get_logger().info('Action: MOVE FORWARD')
# Publish velocity command
self.cmd_pub.publish(cmd_vel)
self.log_counter += 1
# Display state every 100 messages
if self.log_counter % 100 == 0:
self.get_logger().info('-'*40)
self.get_logger().info(f'STATE: {self.state}')
self.get_logger().info(f'Front distance: {self.obstacle_distance:.2f}m')
self.get_logger().info(f'Linear: {cmd_vel.linear.x:.2f} m/s, Angular: {cmd_vel.angular.z:.2f} rad/s')
self.get_logger().info('-'*40)
except Exception as e:
# Silent fail - don't crash on errors
print(f"[Callback Error: {e}]")
return
def emergency_stop(self):
"""Emergency stop function"""
try:
cmd_vel = Twist()
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.0
self.cmd_pub.publish(cmd_vel)
self.get_logger().critical('🛑 EMERGENCY STOP')
except:
# If ROS context is dead, just print
print("🛑 EMERGENCY STOP ACTIVATED")
def main(args=None):
rclpy.init(args=args)
node = None # Initialize node variable
try:
node = SimpleObstacleAvoidance()
rclpy.spin(node)
except KeyboardInterrupt:
print("\nShutdown requested by user")
if node is not None:
node.emergency_stop()
except Exception as e:
print(f"\n⚠️ Node error: {e}")
print("This is likely a ROS context issue during shutdown")
finally:
# Always clean up
if node is not None:
try:
node.destroy_node()
except:
pass # Node might already be destroyed
try:
rclpy.shutdown()
except:
pass # ROS might already be shutdown
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Simple Obstacle Avoidance Algorithm for TurtleBot3
Task 3: Basic logic with distance thresholds
Rule: If min distance in front < 0.4m → stop + turn, Else → move forward
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
class SimpleObstacleAvoidance(Node):
def __init__(self):
super().__init__('simple_obstacle_avoidance')
# Create publisher for velocity commands
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# Subscribe to LiDAR data
self.scan_sub = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)
# Control parameters - ADJUST THESE BASED ON YOUR DATA
self.forward_speed = 0.15 # m/s - normal forward speed
self.turn_speed = 0.5 # rad/s - turning speed
self.stop_distance = 0.4 # meters - stop and turn threshold (Task requirement)
self.slow_distance = 0.6 # meters - slow down threshold
# Define front sector angles (in degrees)
self.front_angle = 30 # ±15 degrees from center
self.side_angle = 60 # For checking left/right clearance
# State tracking
self.state = "FORWARD"
self.obstacle_distance = 0.0
self.turn_direction = 1 # 1 = right, -1 = left
# For logging
self.log_counter = 0
self.get_logger().info('='*50)
self.get_logger().info('SIMPLE OBSTACLE AVOIDANCE ALGORITHM')
self.get_logger().info('='*50)
self.get_logger().info(f'Stop threshold: {self.stop_distance}m')
self.get_logger().info(f'Slow threshold: {self.slow_distance}m')
self.get_logger().info(f'Front angle: ±{self.front_angle/2}°')
self.get_logger().info('Algorithm:')
self.get_logger().info(' IF distance < 0.4m → STOP + TURN')
self.get_logger().info(' ELSE → MOVE FORWARD')
self.get_logger().info('='*50)
def get_front_distances(self, ranges, angle_min, angle_increment):
"""Extract distances in front of robot"""
num_points = len(ranges)
# Calculate indices for front sector
center_index = num_points // 2
half_sector = int((self.front_angle * np.pi / 180) / angle_increment / 2)
start_idx = center_index - half_sector
end_idx = center_index + half_sector
# Ensure indices are within bounds
start_idx = max(0, start_idx)
end_idx = min(num_points, end_idx)
# Get front ranges
front_ranges = ranges[start_idx:end_idx]
# Filter out invalid values (0, inf, nan)
valid_ranges = []
for d in front_ranges:
if not (np.isnan(d) or np.isinf(d) or d == 0):
valid_ranges.append(d)
return valid_ranges
def check_clear_direction(self, ranges, angle_min, angle_increment, start_deg, end_deg):
"""Check if direction has enough clearance"""
num_points = len(ranges)
# Convert degrees to indices
start_idx = int((start_deg * np.pi / 180 - angle_min) / angle_increment)
end_idx = int((end_deg * np.pi / 180 - angle_min) / angle_increment)
# Ensure indices are within bounds
start_idx = max(0, start_idx)
end_idx = min(num_points, end_idx)
if start_idx >= end_idx:
return float('inf')
# Get ranges for this direction
dir_ranges = ranges[start_idx:end_idx]
# Find minimum valid distance
min_dist = float('inf')
for d in dir_ranges:
if not (np.isnan(d) or np.isinf(d) or d == 0):
if d < min_dist:
min_dist = d
return min_dist
def choose_turn_direction(self, ranges, angle_min, angle_increment):
"""Choose which direction to turn based on side clearance"""
# Check left side (45-135 degrees)
left_min = self.check_clear_direction(ranges, angle_min, angle_increment, 45, 135)
# Check right side (225-315 degrees)
right_min = self.check_clear_direction(ranges, angle_min, angle_increment, 225, 315)
self.get_logger().info(f'Left clearance: {left_min:.2f}m, Right clearance: {right_min:.2f}m')
# Choose direction with more space
if left_min > right_min:
self.get_logger().info('Turning LEFT (more space)')
return -1 # Negative for left turn
else:
self.get_logger().info('Turning RIGHT (more space)')
return 1 # Positive for right turn
def scan_callback(self, msg):
"""Process LiDAR data and control robot"""
# Create velocity command
cmd_vel = Twist()
# Get front distances
front_distances = self.get_front_distances(msg.ranges, msg.angle_min, msg.angle_increment)
if not front_distances:
# No valid front readings - stop and wait
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.0
self.state = "STOP_NO_DATA"
self.get_logger().warn('No valid front LiDAR data')
else:
# Find minimum front distance
min_front = min(front_distances)
self.obstacle_distance = min_front
# TASK 3 ALGORITHM IMPLEMENTATION
if min_front < self.stop_distance:
# OBSTACLE TOO CLOSE: STOP + TURN
cmd_vel.linear.x = 0.0
# Choose turn direction
self.turn_direction = self.choose_turn_direction(
msg.ranges, msg.angle_min, msg.angle_increment)
cmd_vel.angular.z = self.turn_direction * self.turn_speed
self.state = "STOP_TURN"
# Log every 10th message to avoid spam
if self.log_counter % 10 == 0:
self.get_logger().warning(f'🚨 OBSTACLE TOO CLOSE: {min_front:.2f}m < {self.stop_distance}m')
self.get_logger().info(f'Action: STOP + TURN {"LEFT" if self.turn_direction < 0 else "RIGHT"}')
elif min_front < self.slow_distance:
# OBSTACLE NEARBY: SLOW DOWN
cmd_vel.linear.x = self.forward_speed * 0.5 # Half speed
cmd_vel.angular.z = 0.0
self.state = "SLOW"
if self.log_counter % 20 == 0:
self.get_logger().info(f'⚠️ Close obstacle: {min_front:.2f}m')
self.get_logger().info('Action: SLOW DOWN')
else:
# CLEAR PATH: MOVE FORWARD
cmd_vel.linear.x = self.forward_speed
cmd_vel.angular.z = 0.0
self.state = "FORWARD"
if self.log_counter % 30 == 0:
self.get_logger().info(f'✅ Clear path: {min_front:.2f}m')
self.get_logger().info('Action: MOVE FORWARD')
# Publish velocity command
self.cmd_pub.publish(cmd_vel)
self.log_counter += 1
# Display state every 50 messages
if self.log_counter % 50 == 0:
self.get_logger().info('-'*40)
self.get_logger().info(f'STATE: {self.state}')
self.get_logger().info(f'Front distance: {self.obstacle_distance:.2f}m')
self.get_logger().info(f'Linear: {cmd_vel.linear.x:.2f} m/s, Angular: {cmd_vel.angular.z:.2f} rad/s')
self.get_logger().info('-'*40)
def emergency_stop(self):
"""Emergency stop function"""
cmd_vel = Twist()
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.0
self.cmd_pub.publish(cmd_vel)
self.get_logger().critical('🛑 EMERGENCY STOP')
def main(args=None):
rclpy.init(args=args)
try:
node = SimpleObstacleAvoidance()
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down obstacle avoidance...')
node.emergency_stop()
except Exception as e:
node.get_logger().error(f'Error: {e}')
if 'node' in locals():
node.emergency_stop()
finally:
if 'node' in locals():
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Ultra Simple Obstacle Avoidance - Exactly as Task 3 specifies
Rule: If min distance in front < 0.4m → stop + turn, Else → move forward
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
class UltraSimpleAvoidance(Node):
def __init__(self):
super().__init__('ultra_simple_avoidance')
self.pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.sub = self.create_subscription(LaserScan, '/scan', self.callback, 10)
# Parameters from Task 3
self.stop_threshold = 0.4 # meters
self.forward_speed = 0.2 # m/s
self.turn_speed = 0.5 # rad/s
self.get_logger().info(f'Ultra Simple Avoidance Started')
self.get_logger().info(f'Rule: distance < {self.stop_threshold}m → STOP + TURN')
self.get_logger().info(f'Rule: distance ≥ {self.stop_threshold}m → MOVE FORWARD')
def callback(self, msg):
twist = Twist()
# Get front distances (0° ± 30°)
ranges = list(msg.ranges)
num_readings = len(ranges)
# Front indices (330° to 30°)
front_start = int(num_readings * 330/360)
front_end = int(num_readings * 30/360)
if front_end <= front_start:
front_ranges = ranges[front_start:] + ranges[:front_end]
else:
front_ranges = ranges[front_start:front_end]
# Filter valid distances
valid_distances = [d for d in front_ranges if 0.05 < d < 10.0]
if not valid_distances:
# No valid data - stop
twist.linear.x = 0.0
twist.angular.z = 0.0
self.get_logger().warn('No valid front data')
else:
min_distance = min(valid_distances)
# TASK 3 ALGORITHM
if min_distance < self.stop_threshold:
# STOP + TURN
twist.linear.x = 0.0
# Simple turn logic: always turn right
twist.angular.z = self.turn_speed
self.get_logger().warning(f'OBSTACLE: {min_distance:.2f}m < {self.stop_threshold}m')
self.get_logger().info('Action: STOP + TURN RIGHT')
else:
# MOVE FORWARD
twist.linear.x = self.forward_speed
twist.angular.z = 0.0
self.get_logger().info(f'CLEAR: {min_distance:.2f}m ≥ {self.stop_threshold}m')
self.get_logger().info('Action: MOVE FORWARD')
self.pub.publish(twist)
def main(args=None):
rclpy.init(args=args)
node = UltraSimpleAvoidance()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/bin/bash
echo "=== VERIFYING ROBOT MOVEMENT ==="
# Create test directory
TEST_DIR="$HOME/movement_test_$(date +%s)"
mkdir -p "$TEST_DIR"
cd "$TEST_DIR"
echo "Test directory: $TEST_DIR"
echo ""
# Step 1: Check prerequisites
echo "1. Checking prerequisites..."
if ! ros2 topic list | grep -q "/cmd_vel"; then
echo "❌ ERROR: /cmd_vel topic not found"
echo " Make sure Gazebo is running first:"
echo " ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py"
exit 1
fi
echo "✅ /cmd_vel topic exists"
# Check if /scan has data
SCAN_CHECK=$(timeout 2 ros2 topic echo /scan --once 2>&1 | head -3)
if echo "$SCAN_CHECK" | grep -q "ranges"; then
echo "✅ /scan topic has data"
else
echo "⚠️ /scan topic might not have data yet"
echo " Waiting 3 seconds for Gazebo to initialize..."
sleep 3
fi
# Source workspace
cd ~/turtlebot3_ws 2>/dev/null && source install/setup.bash 2>/dev/null
export TURTLEBOT3_MODEL=burger 2>/dev/null
echo ""
echo "2. Starting recording..."
echo " Recording /cmd_vel for 10 seconds..."
ros2 topic echo /cmd_vel > cmd_vel_test.txt 2>&1 &
RECORD_PID=$!
sleep 2 # Give recorder time to start
echo ""
echo "3. Running obstacle avoidance for 10 seconds..."
echo " Using: ultra_simple (known working version)"
timeout 10 ros2 run tb3_motion_control ultra_simple 2>&1 | tee node_output.log
echo ""
echo "4. Stopping recording..."
kill $RECORD_PID 2>/dev/null
wait $RECORD_PID 2>/dev/null 2>/dev/null
sleep 1
echo ""
echo "=== ANALYSIS ==="
echo "Checking recorded commands..."
if [ -s cmd_vel_test.txt ]; then
# Count total commands
TOTAL=$(grep -c "linear:" cmd_vel_test.txt 2>/dev/null || echo "0")
# Count movement commands (x > 0)
MOVING=$(grep -c '"x": 0\.[0-9]' cmd_vel_test.txt 2>/dev/null || echo "0")
# Count stop commands (x = 0)
STOPPED=$(grep -c '"x": 0\.0' cmd_vel_test.txt 2>/dev/null || echo "0")
echo "Total commands: $TOTAL"
echo "Movement commands: $MOVING"
echo "Stop commands: $STOPPED"
if [ "$TOTAL" -eq 0 ]; then
echo "❌ PROBLEM: No commands recorded"
echo " Possible issues:"
echo " - Gazebo not running"
echo " - Robot not spawned"
echo " - ROS topics not available"
elif [ "$MOVING" -gt 0 ]; then
echo "✅ SUCCESS: Robot IS moving!"
echo ""
echo "Sample movement values:"
grep '"x": 0\.[0-9]' cmd_vel_test.txt 2>/dev/null | head -3 | sed 's/^/ /'
# Check for turns
TURNS=$(grep -c '"z": 0\.[0-9]\|"z": -0\.[0-9]' cmd_vel_test.txt 2>/dev/null || echo "0")
echo "Turn commands: $TURNS"
# Calculate percentages
if [ "$TOTAL" -gt 0 ]; then
MOVE_PERCENT=$((MOVING * 100 / TOTAL))
STOP_PERCENT=$((STOPPED * 100 / TOTAL))
echo "Movement percentage: $MOVE_PERCENT%"
echo "Stop percentage: $STOP_PERCENT%"
fi
else
echo "❌ PROBLEM: Robot NOT moving"
echo "All commands:"
head -5 cmd_vel_test.txt 2>/dev/null | sed 's/^/ /'
echo ""
echo "Check node output:"
tail -5 node_output.log 2>/dev/null
fi
else
echo "❌ No data recorded in cmd_vel_test.txt"
echo "Quick diagnostic:"
echo " Testing /cmd_vel directly..."
timeout 2 ros2 topic echo /cmd_vel --once 2>&1 | head -3
fi
echo ""
echo "=== FILES CREATED ==="
ls -la
echo ""
echo "Data saved in: $TEST_DIR"
echo ""
echo "📸 Take screenshot of:"
echo "1. This output (especially 'SUCCESS: Robot IS moving!')"
echo "2. Gazebo window showing robot movement"
echo "3. The files listed above"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment