Last active
January 26, 2026 09:31
-
-
Save anasAlsalol/02bcca64452fade5bb48f62cad5f1e85 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| TurtleBot3 Obstacle Avoidance Node | |
| Subscribes to /scan (LiDAR) and publishes to /cmd_vel | |
| Implements: Forward movement, obstacle detection, collision avoidance | |
| """ | |
| import rclpy | |
| from rclpy.node import Node | |
| from sensor_msgs.msg import LaserScan | |
| from geometry_msgs.msg import Twist | |
| import numpy as np | |
| class ObstacleAvoider(Node): | |
| def __init__(self): | |
| super().__init__('obstacle_avoider') | |
| # Publisher for velocity commands | |
| self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10) | |
| # Subscriber to LiDAR data | |
| self.subscription = self.create_subscription( | |
| LaserScan, | |
| '/scan', | |
| self.scan_callback, | |
| 10) | |
| # Control parameters | |
| self.linear_speed = 0.15 # m/s - forward speed | |
| self.angular_speed = 0.5 # rad/s - turning speed | |
| self.min_safe_distance = 0.5 # meters - stop distance | |
| self.turn_threshold = 0.8 # meters - start turning distance | |
| # State variables | |
| self.obstacle_detected = False | |
| self.turn_direction = 0 # -1: left, 1: right | |
| self.last_turn_time = self.get_clock().now() | |
| # Scan data storage | |
| self.scan_ranges = [] | |
| self.scan_available = False | |
| # Timer for periodic control | |
| self.timer = self.create_timer(0.1, self.control_loop) # 10 Hz | |
| self.get_logger().info('🎯 Obstacle Avoider Node Started') | |
| self.get_logger().info(f'Safe distance: {self.min_safe_distance}m') | |
| self.get_logger().info(f'Forward speed: {self.linear_speed}m/s') | |
| self.get_logger().info('Ready to navigate!') | |
| def scan_callback(self, msg): | |
| """Process incoming LiDAR data""" | |
| self.scan_ranges = list(msg.ranges) | |
| self.scan_available = True | |
| # Replace infinite values with max range | |
| for i in range(len(self.scan_ranges)): | |
| if np.isinf(self.scan_ranges[i]): | |
| self.scan_ranges[i] = msg.range_max | |
| def check_obstacles(self): | |
| """Check for obstacles in front of robot""" | |
| if not self.scan_available or len(self.scan_ranges) == 0: | |
| return False, 0 | |
| # Define front sector (45 degrees centered on front) | |
| num_readings = len(self.scan_ranges) | |
| front_start = int(num_readings * 0.4375) # 315 degrees | |
| front_end = int(num_readings * 0.5625) # 45 degrees | |
| # Extract front readings | |
| front_readings = self.scan_ranges[front_start:front_end] | |
| # Check for obstacles | |
| min_distance = min(front_readings) if front_readings else float('inf') | |
| if min_distance < self.min_safe_distance: | |
| self.get_logger().warning(f'🚨 Obstacle detected at {min_distance:.2f}m') | |
| return True, min_distance | |
| elif min_distance < self.turn_threshold: | |
| self.get_logger().info(f'⚠️ Close obstacle at {min_distance:.2f}m') | |
| return False, min_distance | |
| return False, min_distance | |
| def choose_turn_direction(self): | |
| """Choose best direction to turn based on side obstacles""" | |
| if not self.scan_available: | |
| return 1 # Default to right turn | |
| num_readings = len(self.scan_ranges) | |
| # Left side (90-135 degrees) | |
| left_start = int(num_readings * 0.25) # 90 degrees | |
| left_end = int(num_readings * 0.375) # 135 degrees | |
| # Right side (225-270 degrees) | |
| right_start = int(num_readings * 0.625) # 225 degrees | |
| right_end = int(num_readings * 0.75) # 270 degrees | |
| left_readings = self.scan_ranges[left_start:left_end] | |
| right_readings = self.scan_ranges[right_start:right_end] | |
| # Calculate average distances | |
| left_avg = np.mean(left_readings) if left_readings else float('inf') | |
| right_avg = np.mean(right_readings) if right_readings else float('inf') | |
| # Choose direction with more space | |
| if left_avg > right_avg: | |
| self.get_logger().info('🔄 Turning LEFT (more space)') | |
| return -1 # Left turn | |
| else: | |
| self.get_logger().info('🔄 Turning RIGHT (more space)') | |
| return 1 # Right turn | |
| def control_loop(self): | |
| """Main control loop - called by timer""" | |
| if not self.scan_available: | |
| return | |
| # Create Twist message | |
| twist_msg = Twist() | |
| # Check for obstacles | |
| obstacle_detected, distance = self.check_obstacles() | |
| if obstacle_detected: | |
| # STOP - obstacle too close | |
| twist_msg.linear.x = 0.0 | |
| twist_msg.angular.z = 0.0 | |
| # Choose turn direction | |
| self.turn_direction = self.choose_turn_direction() | |
| self.last_turn_time = self.get_clock().now() | |
| self.get_logger().warning(f'🛑 STOPPED - Obstacle at {distance:.2f}m') | |
| elif distance < self.turn_threshold: | |
| # SLOW DOWN and prepare to turn | |
| twist_msg.linear.x = self.linear_speed * 0.5 # Reduced speed | |
| twist_msg.angular.z = 0.0 | |
| self.get_logger().info(f'🐢 SLOWING - Obstacle at {distance:.2f}m') | |
| else: | |
| # NORMAL FORWARD MOVEMENT | |
| twist_msg.linear.x = self.linear_speed | |
| twist_msg.angular.z = 0.0 | |
| # Log periodically | |
| current_time = self.get_clock().now() | |
| if (current_time - self.last_turn_time).nanoseconds > 3e9: # Every 3 seconds | |
| self.get_logger().info(f'🚀 Moving FORWARD - Clear path ahead') | |
| self.last_turn_time = current_time | |
| # Publish velocity command | |
| self.publisher_.publish(twist_msg) | |
| def emergency_stop(self): | |
| """Emergency stop function""" | |
| twist_msg = Twist() | |
| twist_msg.linear.x = 0.0 | |
| twist_msg.angular.z = 0.0 | |
| self.publisher_.publish(twist_msg) | |
| self.get_logger().critical('🛑 EMERGENCY STOP ACTIVATED') | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| try: | |
| node = ObstacleAvoider() | |
| rclpy.spin(node) | |
| except KeyboardInterrupt: | |
| node.get_logger().info('Shutting down obstacle avoider...') | |
| node.emergency_stop() | |
| except Exception as e: | |
| node.get_logger().error(f'Error: {e}') | |
| if 'node' in locals(): | |
| node.emergency_stop() | |
| finally: | |
| if 'node' in locals(): | |
| node.destroy_node() | |
| rclpy.shutdown() | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| # FIXED Task 4 Test Script - Proper movement detection | |
| echo "=== TASK 4: OBSTACLE AVOIDANCE TEST ===" | |
| echo "Duration: 2 minutes" | |
| # Create directory | |
| TEST_DIR="$HOME/task4_test_$(date +%Y%m%d_%H%M%S)" | |
| mkdir -p "$TEST_DIR" | |
| cd "$TEST_DIR" | |
| echo "Test directory: $TEST_DIR" | |
| # Start bag recording | |
| echo "Starting bag recording..." | |
| ros2 bag record -o task4_bag -a & | |
| BAG_PID=$! | |
| # Start individual topic recording (RAW format for better parsing) | |
| echo "Starting topic recording..." | |
| ros2 topic echo /scan > scan.txt & | |
| SCAN_PID=$! | |
| ros2 topic echo /cmd_vel > cmd_vel.txt & | |
| CMD_PID=$! | |
| ros2 topic echo /odom > odom.txt & | |
| ODOM_PID=$! | |
| # Record system info | |
| echo "Recording system information..." | |
| date > test_info.txt | |
| ros2 --version >> test_info.txt | |
| echo "TURTLEBOT3_MODEL=$TURTLEBOT3_MODEL" >> test_info.txt | |
| ros2 node list > nodes.txt | |
| ros2 topic list > topics.txt | |
| echo "Test starting in 5 seconds..." | |
| sleep 5 | |
| # Run the node (in background to capture output) | |
| echo "Starting obstacle avoidance node..." | |
| ros2 run tb3_motion_control simple_avoidance 2>&1 | tee node_output.txt & | |
| NODE_PID=$! | |
| echo "Test running... (120 seconds)" | |
| for i in {1..120}; do | |
| if (( i % 30 == 0 )); then | |
| echo "Elapsed: ${i}s / 120s" | |
| fi | |
| sleep 1 | |
| done | |
| echo "Test complete!" | |
| echo "Stopping recordings..." | |
| # Stop everything | |
| kill $NODE_PID 2>/dev/null | |
| sleep 2 | |
| kill $BAG_PID $SCAN_PID $CMD_PID $ODOM_PID 2>/dev/null | |
| sleep 2 | |
| # Generate summary with PROPER movement detection | |
| echo "Generating summary..." | |
| echo "=== TEST SUMMARY ===" > summary.txt | |
| echo "Directory: $TEST_DIR" >> summary.txt | |
| echo "Duration: 120 seconds" >> summary.txt | |
| echo "" >> summary.txt | |
| # Check cmd_vel data PROPERLY | |
| if [ -s cmd_vel.txt ]; then | |
| CMD_COUNT=$(wc -l < cmd_vel.txt) | |
| echo "Commands recorded: $CMD_COUNT" >> summary.txt | |
| # FIXED: Proper movement detection | |
| # Count non-zero linear.x commands | |
| NON_ZERO_COUNT=$(grep -c '"x": 0\.[0-9]' cmd_vel.txt 2>/dev/null || echo "0") | |
| ZERO_COUNT=$(grep -c '"x": 0\.0' cmd_vel.txt 2>/dev/null || echo "0") | |
| echo "" >> summary.txt | |
| echo "=== MOVEMENT ANALYSIS ===" >> summary.txt | |
| echo "Total velocity commands: $CMD_COUNT" >> summary.txt | |
| echo "Movement commands (non-zero): $NON_ZERO_COUNT" >> summary.txt | |
| echo "Stop commands (zero): $ZERO_COUNT" >> summary.txt | |
| if [ "$NON_ZERO_COUNT" -gt 0 ]; then | |
| echo "✅ MOVEMENT DETECTED: Robot received $NON_ZERO_COUNT movement commands" >> summary.txt | |
| # Extract sample movement values | |
| echo "" >> summary.txt | |
| echo "=== SAMPLE MOVEMENT COMMANDS ===" >> summary.txt | |
| grep '"x": 0\.[0-9]' cmd_vel.txt | head -5 | while read -r line; do | |
| echo " $line" >> summary.txt | |
| done | |
| # Check for turns (non-zero angular.z) | |
| TURN_COUNT=$(grep -c '"z": -0\.[0-9]\|"z": 0\.[0-9]' cmd_vel.txt 2>/dev/null || echo "0") | |
| echo "" >> summary.txt | |
| echo "Turn commands detected: $TURN_COUNT" >> summary.txt | |
| else | |
| echo "⚠️ NO MOVEMENT DETECTED: All commands have linear.x = 0.0" >> summary.txt | |
| fi | |
| else | |
| echo "❌ ERROR: No cmd_vel data recorded" >> summary.txt | |
| fi | |
| # Check node output | |
| echo "" >> summary.txt | |
| echo "=== NODE OUTPUT SUMMARY ===" >> summary.txt | |
| if [ -s node_output.txt ]; then | |
| echo "Last 20 lines of node output:" >> summary.txt | |
| tail -20 node_output.txt >> summary.txt | |
| # Check for obstacle detections | |
| OBSTACLE_COUNT=$(grep -c -i "obstacle\|too close\|emergency\|turning" node_output.txt 2>/dev/null || echo "0") | |
| echo "" >> summary.txt | |
| echo "Obstacle detections in logs: $OBSTACLE_COUNT" >> summary.txt | |
| else | |
| echo "No node output recorded" >> summary.txt | |
| fi | |
| # Check scan data | |
| echo "" >> summary.txt | |
| echo "=== SENSOR DATA ===" >> summary.txt | |
| if [ -s scan.txt ]; then | |
| SCAN_COUNT=$(wc -l < scan.txt) | |
| echo "Scan messages: $SCAN_COUNT" >> summary.txt | |
| else | |
| echo "No scan data" >> summary.txt | |
| fi | |
| if [ -s odom.txt ]; then | |
| ODOM_COUNT=$(wc -l < odom.txt) | |
| echo "Odometry messages: $ODOM_COUNT" >> summary.txt | |
| else | |
| echo "No odometry data" >> summary.txt | |
| fi | |
| # List all files | |
| echo "" >> summary.txt | |
| echo "=== FILES CREATED ===" >> summary.txt | |
| ls -la >> summary.txt | |
| # Create a simple movement visualization | |
| if [ -s cmd_vel.txt ]; then | |
| echo "" >> summary.txt | |
| echo "=== MOVEMENT VISUALIZATION ===" >> summary.txt | |
| # Extract linear.x values (simplified) | |
| grep -o '"x": [0-9.-]*' cmd_vel.txt | cut -d' ' -f2 > linear_values.txt | |
| # Create ASCII chart | |
| echo "Linear X velocity (sampled every 10th command):" >> summary.txt | |
| awk 'NR % 10 == 0 {print $0}' linear_values.txt | head -30 | while read val; do | |
| # Scale for display | |
| bars=$(printf "%0.s█" $(seq 1 $(echo "scale=0; (($val)*50)+1" | bc 2>/dev/null || echo 1))) | |
| printf " %6.3f | %s\n" "$val" "$bars" >> summary.txt | |
| done | |
| fi | |
| echo "" | |
| echo "=== TEST COMPLETE ===" | |
| echo "Results saved to: $TEST_DIR" | |
| echo "Check summary.txt for detailed analysis" | |
| echo "" | |
| echo "📸 SCREENSHOTS REQUIRED:" | |
| echo "1. Terminal showing 'MOVEMENT DETECTED'" | |
| echo "2. Gazebo window showing robot movement" | |
| echo "3. summary.txt content" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Simple Obstacle Avoidance Algorithm for TurtleBot3 | |
| Task 3: Basic logic with distance thresholds | |
| Rule: If min distance in front < 0.4m → stop + turn, Else → move forward | |
| """ | |
| import rclpy | |
| from rclpy.node import Node | |
| from sensor_msgs.msg import LaserScan | |
| from geometry_msgs.msg import Twist | |
| import numpy as np | |
| class SimpleObstacleAvoidance(Node): | |
| def __init__(self): | |
| super().__init__('simple_obstacle_avoidance') | |
| # Create publisher for velocity commands | |
| self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10) | |
| # Subscribe to LiDAR data | |
| self.scan_sub = self.create_subscription( | |
| LaserScan, | |
| '/scan', | |
| self.scan_callback, | |
| 10) | |
| # Control parameters - ADJUST THESE BASED ON YOUR DATA | |
| self.forward_speed = 0.15 # m/s - normal forward speed | |
| self.turn_speed = 0.5 # rad/s - turning speed | |
| self.stop_distance = 0.4 # meters - stop and turn threshold (Task requirement) | |
| self.slow_distance = 0.6 # meters - slow down threshold | |
| # Define front sector angles (in degrees) | |
| self.front_angle = 30 # ±15 degrees from center | |
| self.side_angle = 60 # For checking left/right clearance | |
| # State tracking | |
| self.state = "FORWARD" | |
| self.obstacle_distance = 0.0 | |
| self.turn_direction = 1 # 1 = right, -1 = left | |
| # For logging | |
| self.log_counter = 0 | |
| self.get_logger().info('='*50) | |
| self.get_logger().info('SIMPLE OBSTACLE AVOIDANCE ALGORITHM') | |
| self.get_logger().info('='*50) | |
| self.get_logger().info(f'Stop threshold: {self.stop_distance}m') | |
| self.get_logger().info(f'Slow threshold: {self.slow_distance}m') | |
| self.get_logger().info(f'Front angle: ±{self.front_angle/2}°') | |
| self.get_logger().info('Algorithm:') | |
| self.get_logger().info(' IF distance < 0.4m → STOP + TURN') | |
| self.get_logger().info(' ELSE → MOVE FORWARD') | |
| self.get_logger().info('='*50) | |
| def get_front_distances(self, ranges, angle_min, angle_increment): | |
| """Extract distances in front of robot""" | |
| num_points = len(ranges) | |
| # Calculate indices for front sector | |
| center_index = num_points // 2 | |
| half_sector = int((self.front_angle * np.pi / 180) / angle_increment / 2) | |
| start_idx = center_index - half_sector | |
| end_idx = center_index + half_sector | |
| # Ensure indices are within bounds | |
| start_idx = max(0, start_idx) | |
| end_idx = min(num_points, end_idx) | |
| # Get front ranges | |
| front_ranges = ranges[start_idx:end_idx] | |
| # Filter out invalid values (0, inf, nan) | |
| valid_ranges = [] | |
| for d in front_ranges: | |
| if not (np.isnan(d) or np.isinf(d) or d == 0): | |
| valid_ranges.append(d) | |
| return valid_ranges | |
| def check_clear_direction(self, ranges, angle_min, angle_increment, start_deg, end_deg): | |
| """Check if direction has enough clearance""" | |
| num_points = len(ranges) | |
| # Convert degrees to indices | |
| start_idx = int((start_deg * np.pi / 180 - angle_min) / angle_increment) | |
| end_idx = int((end_deg * np.pi / 180 - angle_min) / angle_increment) | |
| # Ensure indices are within bounds | |
| start_idx = max(0, start_idx) | |
| end_idx = min(num_points, end_idx) | |
| if start_idx >= end_idx: | |
| return float('inf') | |
| # Get ranges for this direction | |
| dir_ranges = ranges[start_idx:end_idx] | |
| # Find minimum valid distance | |
| min_dist = float('inf') | |
| for d in dir_ranges: | |
| if not (np.isnan(d) or np.isinf(d) or d == 0): | |
| if d < min_dist: | |
| min_dist = d | |
| return min_dist | |
| def choose_turn_direction(self, ranges, angle_min, angle_increment): | |
| """Choose which direction to turn based on side clearance""" | |
| # Check left side (45-135 degrees) | |
| left_min = self.check_clear_direction(ranges, angle_min, angle_increment, 45, 135) | |
| # Check right side (225-315 degrees) | |
| right_min = self.check_clear_direction(ranges, angle_min, angle_increment, 225, 315) | |
| # Only log occasionally to avoid spam | |
| if self.log_counter % 50 == 0: | |
| self.get_logger().info(f'Left clearance: {left_min:.2f}m, Right clearance: {right_min:.2f}m') | |
| # Choose direction with more space | |
| if left_min > right_min: | |
| if self.log_counter % 50 == 0: | |
| self.get_logger().info('Turning LEFT (more space)') | |
| return -1 # Negative for left turn | |
| else: | |
| if self.log_counter % 50 == 0: | |
| self.get_logger().info('Turning RIGHT (more space)') | |
| return 1 # Positive for right turn | |
| def scan_callback(self, msg): | |
| """Process LiDAR data and control robot""" | |
| try: | |
| # Create velocity command | |
| cmd_vel = Twist() | |
| # Get front distances | |
| front_distances = self.get_front_distances(msg.ranges, msg.angle_min, msg.angle_increment) | |
| if not front_distances: | |
| # No valid front readings - stop and wait | |
| cmd_vel.linear.x = 0.0 | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "STOP_NO_DATA" | |
| if self.log_counter % 100 == 0: | |
| self.get_logger().warn('No valid front LiDAR data') | |
| else: | |
| # Find minimum front distance | |
| min_front = min(front_distances) | |
| self.obstacle_distance = min_front | |
| # TASK 3 ALGORITHM IMPLEMENTATION | |
| if min_front < self.stop_distance: | |
| # OBSTACLE TOO CLOSE: STOP + TURN | |
| cmd_vel.linear.x = 0.0 | |
| # Choose turn direction | |
| self.turn_direction = self.choose_turn_direction( | |
| msg.ranges, msg.angle_min, msg.angle_increment) | |
| cmd_vel.angular.z = self.turn_direction * self.turn_speed | |
| self.state = "STOP_TURN" | |
| # Log every 30th message to avoid spam | |
| if self.log_counter % 30 == 0: | |
| self.get_logger().warning(f'🚨 OBSTACLE TOO CLOSE: {min_front:.2f}m < {self.stop_distance}m') | |
| self.get_logger().info(f'Action: STOP + TURN {"LEFT" if self.turn_direction < 0 else "RIGHT"}') | |
| elif min_front < self.slow_distance: | |
| # OBSTACLE NEARBY: SLOW DOWN | |
| cmd_vel.linear.x = self.forward_speed * 0.5 # Half speed | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "SLOW" | |
| if self.log_counter % 60 == 0: | |
| self.get_logger().info(f'⚠️ Close obstacle: {min_front:.2f}m') | |
| self.get_logger().info('Action: SLOW DOWN') | |
| else: | |
| # CLEAR PATH: MOVE FORWARD | |
| cmd_vel.linear.x = self.forward_speed | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "FORWARD" | |
| if self.log_counter % 90 == 0: | |
| self.get_logger().info(f'✅ Clear path: {min_front:.2f}m') | |
| self.get_logger().info('Action: MOVE FORWARD') | |
| # Publish velocity command | |
| self.cmd_pub.publish(cmd_vel) | |
| self.log_counter += 1 | |
| # Display state every 100 messages | |
| if self.log_counter % 100 == 0: | |
| self.get_logger().info('-'*40) | |
| self.get_logger().info(f'STATE: {self.state}') | |
| self.get_logger().info(f'Front distance: {self.obstacle_distance:.2f}m') | |
| self.get_logger().info(f'Linear: {cmd_vel.linear.x:.2f} m/s, Angular: {cmd_vel.angular.z:.2f} rad/s') | |
| self.get_logger().info('-'*40) | |
| except Exception as e: | |
| # Silent fail - don't crash on errors | |
| print(f"[Callback Error: {e}]") | |
| return | |
| def emergency_stop(self): | |
| """Emergency stop function""" | |
| try: | |
| cmd_vel = Twist() | |
| cmd_vel.linear.x = 0.0 | |
| cmd_vel.angular.z = 0.0 | |
| self.cmd_pub.publish(cmd_vel) | |
| self.get_logger().critical('🛑 EMERGENCY STOP') | |
| except: | |
| # If ROS context is dead, just print | |
| print("🛑 EMERGENCY STOP ACTIVATED") | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| node = None # Initialize node variable | |
| try: | |
| node = SimpleObstacleAvoidance() | |
| rclpy.spin(node) | |
| except KeyboardInterrupt: | |
| print("\nShutdown requested by user") | |
| if node is not None: | |
| node.emergency_stop() | |
| except Exception as e: | |
| print(f"\n⚠️ Node error: {e}") | |
| print("This is likely a ROS context issue during shutdown") | |
| finally: | |
| # Always clean up | |
| if node is not None: | |
| try: | |
| node.destroy_node() | |
| except: | |
| pass # Node might already be destroyed | |
| try: | |
| rclpy.shutdown() | |
| except: | |
| pass # ROS might already be shutdown | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Simple Obstacle Avoidance Algorithm for TurtleBot3 | |
| Task 3: Basic logic with distance thresholds | |
| Rule: If min distance in front < 0.4m → stop + turn, Else → move forward | |
| """ | |
| import rclpy | |
| from rclpy.node import Node | |
| from sensor_msgs.msg import LaserScan | |
| from geometry_msgs.msg import Twist | |
| import numpy as np | |
| class SimpleObstacleAvoidance(Node): | |
| def __init__(self): | |
| super().__init__('simple_obstacle_avoidance') | |
| # Create publisher for velocity commands | |
| self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10) | |
| # Subscribe to LiDAR data | |
| self.scan_sub = self.create_subscription( | |
| LaserScan, | |
| '/scan', | |
| self.scan_callback, | |
| 10) | |
| # Control parameters - ADJUST THESE BASED ON YOUR DATA | |
| self.forward_speed = 0.15 # m/s - normal forward speed | |
| self.turn_speed = 0.5 # rad/s - turning speed | |
| self.stop_distance = 0.4 # meters - stop and turn threshold (Task requirement) | |
| self.slow_distance = 0.6 # meters - slow down threshold | |
| # Define front sector angles (in degrees) | |
| self.front_angle = 30 # ±15 degrees from center | |
| self.side_angle = 60 # For checking left/right clearance | |
| # State tracking | |
| self.state = "FORWARD" | |
| self.obstacle_distance = 0.0 | |
| self.turn_direction = 1 # 1 = right, -1 = left | |
| # For logging | |
| self.log_counter = 0 | |
| self.get_logger().info('='*50) | |
| self.get_logger().info('SIMPLE OBSTACLE AVOIDANCE ALGORITHM') | |
| self.get_logger().info('='*50) | |
| self.get_logger().info(f'Stop threshold: {self.stop_distance}m') | |
| self.get_logger().info(f'Slow threshold: {self.slow_distance}m') | |
| self.get_logger().info(f'Front angle: ±{self.front_angle/2}°') | |
| self.get_logger().info('Algorithm:') | |
| self.get_logger().info(' IF distance < 0.4m → STOP + TURN') | |
| self.get_logger().info(' ELSE → MOVE FORWARD') | |
| self.get_logger().info('='*50) | |
| def get_front_distances(self, ranges, angle_min, angle_increment): | |
| """Extract distances in front of robot""" | |
| num_points = len(ranges) | |
| # Calculate indices for front sector | |
| center_index = num_points // 2 | |
| half_sector = int((self.front_angle * np.pi / 180) / angle_increment / 2) | |
| start_idx = center_index - half_sector | |
| end_idx = center_index + half_sector | |
| # Ensure indices are within bounds | |
| start_idx = max(0, start_idx) | |
| end_idx = min(num_points, end_idx) | |
| # Get front ranges | |
| front_ranges = ranges[start_idx:end_idx] | |
| # Filter out invalid values (0, inf, nan) | |
| valid_ranges = [] | |
| for d in front_ranges: | |
| if not (np.isnan(d) or np.isinf(d) or d == 0): | |
| valid_ranges.append(d) | |
| return valid_ranges | |
| def check_clear_direction(self, ranges, angle_min, angle_increment, start_deg, end_deg): | |
| """Check if direction has enough clearance""" | |
| num_points = len(ranges) | |
| # Convert degrees to indices | |
| start_idx = int((start_deg * np.pi / 180 - angle_min) / angle_increment) | |
| end_idx = int((end_deg * np.pi / 180 - angle_min) / angle_increment) | |
| # Ensure indices are within bounds | |
| start_idx = max(0, start_idx) | |
| end_idx = min(num_points, end_idx) | |
| if start_idx >= end_idx: | |
| return float('inf') | |
| # Get ranges for this direction | |
| dir_ranges = ranges[start_idx:end_idx] | |
| # Find minimum valid distance | |
| min_dist = float('inf') | |
| for d in dir_ranges: | |
| if not (np.isnan(d) or np.isinf(d) or d == 0): | |
| if d < min_dist: | |
| min_dist = d | |
| return min_dist | |
| def choose_turn_direction(self, ranges, angle_min, angle_increment): | |
| """Choose which direction to turn based on side clearance""" | |
| # Check left side (45-135 degrees) | |
| left_min = self.check_clear_direction(ranges, angle_min, angle_increment, 45, 135) | |
| # Check right side (225-315 degrees) | |
| right_min = self.check_clear_direction(ranges, angle_min, angle_increment, 225, 315) | |
| self.get_logger().info(f'Left clearance: {left_min:.2f}m, Right clearance: {right_min:.2f}m') | |
| # Choose direction with more space | |
| if left_min > right_min: | |
| self.get_logger().info('Turning LEFT (more space)') | |
| return -1 # Negative for left turn | |
| else: | |
| self.get_logger().info('Turning RIGHT (more space)') | |
| return 1 # Positive for right turn | |
| def scan_callback(self, msg): | |
| """Process LiDAR data and control robot""" | |
| # Create velocity command | |
| cmd_vel = Twist() | |
| # Get front distances | |
| front_distances = self.get_front_distances(msg.ranges, msg.angle_min, msg.angle_increment) | |
| if not front_distances: | |
| # No valid front readings - stop and wait | |
| cmd_vel.linear.x = 0.0 | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "STOP_NO_DATA" | |
| self.get_logger().warn('No valid front LiDAR data') | |
| else: | |
| # Find minimum front distance | |
| min_front = min(front_distances) | |
| self.obstacle_distance = min_front | |
| # TASK 3 ALGORITHM IMPLEMENTATION | |
| if min_front < self.stop_distance: | |
| # OBSTACLE TOO CLOSE: STOP + TURN | |
| cmd_vel.linear.x = 0.0 | |
| # Choose turn direction | |
| self.turn_direction = self.choose_turn_direction( | |
| msg.ranges, msg.angle_min, msg.angle_increment) | |
| cmd_vel.angular.z = self.turn_direction * self.turn_speed | |
| self.state = "STOP_TURN" | |
| # Log every 10th message to avoid spam | |
| if self.log_counter % 10 == 0: | |
| self.get_logger().warning(f'🚨 OBSTACLE TOO CLOSE: {min_front:.2f}m < {self.stop_distance}m') | |
| self.get_logger().info(f'Action: STOP + TURN {"LEFT" if self.turn_direction < 0 else "RIGHT"}') | |
| elif min_front < self.slow_distance: | |
| # OBSTACLE NEARBY: SLOW DOWN | |
| cmd_vel.linear.x = self.forward_speed * 0.5 # Half speed | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "SLOW" | |
| if self.log_counter % 20 == 0: | |
| self.get_logger().info(f'⚠️ Close obstacle: {min_front:.2f}m') | |
| self.get_logger().info('Action: SLOW DOWN') | |
| else: | |
| # CLEAR PATH: MOVE FORWARD | |
| cmd_vel.linear.x = self.forward_speed | |
| cmd_vel.angular.z = 0.0 | |
| self.state = "FORWARD" | |
| if self.log_counter % 30 == 0: | |
| self.get_logger().info(f'✅ Clear path: {min_front:.2f}m') | |
| self.get_logger().info('Action: MOVE FORWARD') | |
| # Publish velocity command | |
| self.cmd_pub.publish(cmd_vel) | |
| self.log_counter += 1 | |
| # Display state every 50 messages | |
| if self.log_counter % 50 == 0: | |
| self.get_logger().info('-'*40) | |
| self.get_logger().info(f'STATE: {self.state}') | |
| self.get_logger().info(f'Front distance: {self.obstacle_distance:.2f}m') | |
| self.get_logger().info(f'Linear: {cmd_vel.linear.x:.2f} m/s, Angular: {cmd_vel.angular.z:.2f} rad/s') | |
| self.get_logger().info('-'*40) | |
| def emergency_stop(self): | |
| """Emergency stop function""" | |
| cmd_vel = Twist() | |
| cmd_vel.linear.x = 0.0 | |
| cmd_vel.angular.z = 0.0 | |
| self.cmd_pub.publish(cmd_vel) | |
| self.get_logger().critical('🛑 EMERGENCY STOP') | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| try: | |
| node = SimpleObstacleAvoidance() | |
| rclpy.spin(node) | |
| except KeyboardInterrupt: | |
| node.get_logger().info('Shutting down obstacle avoidance...') | |
| node.emergency_stop() | |
| except Exception as e: | |
| node.get_logger().error(f'Error: {e}') | |
| if 'node' in locals(): | |
| node.emergency_stop() | |
| finally: | |
| if 'node' in locals(): | |
| node.destroy_node() | |
| rclpy.shutdown() | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Ultra Simple Obstacle Avoidance - Exactly as Task 3 specifies | |
| Rule: If min distance in front < 0.4m → stop + turn, Else → move forward | |
| """ | |
| import rclpy | |
| from rclpy.node import Node | |
| from sensor_msgs.msg import LaserScan | |
| from geometry_msgs.msg import Twist | |
| import numpy as np | |
| class UltraSimpleAvoidance(Node): | |
| def __init__(self): | |
| super().__init__('ultra_simple_avoidance') | |
| self.pub = self.create_publisher(Twist, '/cmd_vel', 10) | |
| self.sub = self.create_subscription(LaserScan, '/scan', self.callback, 10) | |
| # Parameters from Task 3 | |
| self.stop_threshold = 0.4 # meters | |
| self.forward_speed = 0.2 # m/s | |
| self.turn_speed = 0.5 # rad/s | |
| self.get_logger().info(f'Ultra Simple Avoidance Started') | |
| self.get_logger().info(f'Rule: distance < {self.stop_threshold}m → STOP + TURN') | |
| self.get_logger().info(f'Rule: distance ≥ {self.stop_threshold}m → MOVE FORWARD') | |
| def callback(self, msg): | |
| twist = Twist() | |
| # Get front distances (0° ± 30°) | |
| ranges = list(msg.ranges) | |
| num_readings = len(ranges) | |
| # Front indices (330° to 30°) | |
| front_start = int(num_readings * 330/360) | |
| front_end = int(num_readings * 30/360) | |
| if front_end <= front_start: | |
| front_ranges = ranges[front_start:] + ranges[:front_end] | |
| else: | |
| front_ranges = ranges[front_start:front_end] | |
| # Filter valid distances | |
| valid_distances = [d for d in front_ranges if 0.05 < d < 10.0] | |
| if not valid_distances: | |
| # No valid data - stop | |
| twist.linear.x = 0.0 | |
| twist.angular.z = 0.0 | |
| self.get_logger().warn('No valid front data') | |
| else: | |
| min_distance = min(valid_distances) | |
| # TASK 3 ALGORITHM | |
| if min_distance < self.stop_threshold: | |
| # STOP + TURN | |
| twist.linear.x = 0.0 | |
| # Simple turn logic: always turn right | |
| twist.angular.z = self.turn_speed | |
| self.get_logger().warning(f'OBSTACLE: {min_distance:.2f}m < {self.stop_threshold}m') | |
| self.get_logger().info('Action: STOP + TURN RIGHT') | |
| else: | |
| # MOVE FORWARD | |
| twist.linear.x = self.forward_speed | |
| twist.angular.z = 0.0 | |
| self.get_logger().info(f'CLEAR: {min_distance:.2f}m ≥ {self.stop_threshold}m') | |
| self.get_logger().info('Action: MOVE FORWARD') | |
| self.pub.publish(twist) | |
| def main(args=None): | |
| rclpy.init(args=args) | |
| node = UltraSimpleAvoidance() | |
| rclpy.spin(node) | |
| node.destroy_node() | |
| rclpy.shutdown() | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| echo "=== VERIFYING ROBOT MOVEMENT ===" | |
| # Create test directory | |
| TEST_DIR="$HOME/movement_test_$(date +%s)" | |
| mkdir -p "$TEST_DIR" | |
| cd "$TEST_DIR" | |
| echo "Test directory: $TEST_DIR" | |
| echo "" | |
| # Step 1: Check prerequisites | |
| echo "1. Checking prerequisites..." | |
| if ! ros2 topic list | grep -q "/cmd_vel"; then | |
| echo "❌ ERROR: /cmd_vel topic not found" | |
| echo " Make sure Gazebo is running first:" | |
| echo " ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py" | |
| exit 1 | |
| fi | |
| echo "✅ /cmd_vel topic exists" | |
| # Check if /scan has data | |
| SCAN_CHECK=$(timeout 2 ros2 topic echo /scan --once 2>&1 | head -3) | |
| if echo "$SCAN_CHECK" | grep -q "ranges"; then | |
| echo "✅ /scan topic has data" | |
| else | |
| echo "⚠️ /scan topic might not have data yet" | |
| echo " Waiting 3 seconds for Gazebo to initialize..." | |
| sleep 3 | |
| fi | |
| # Source workspace | |
| cd ~/turtlebot3_ws 2>/dev/null && source install/setup.bash 2>/dev/null | |
| export TURTLEBOT3_MODEL=burger 2>/dev/null | |
| echo "" | |
| echo "2. Starting recording..." | |
| echo " Recording /cmd_vel for 10 seconds..." | |
| ros2 topic echo /cmd_vel > cmd_vel_test.txt 2>&1 & | |
| RECORD_PID=$! | |
| sleep 2 # Give recorder time to start | |
| echo "" | |
| echo "3. Running obstacle avoidance for 10 seconds..." | |
| echo " Using: ultra_simple (known working version)" | |
| timeout 10 ros2 run tb3_motion_control ultra_simple 2>&1 | tee node_output.log | |
| echo "" | |
| echo "4. Stopping recording..." | |
| kill $RECORD_PID 2>/dev/null | |
| wait $RECORD_PID 2>/dev/null 2>/dev/null | |
| sleep 1 | |
| echo "" | |
| echo "=== ANALYSIS ===" | |
| echo "Checking recorded commands..." | |
| if [ -s cmd_vel_test.txt ]; then | |
| # Count total commands | |
| TOTAL=$(grep -c "linear:" cmd_vel_test.txt 2>/dev/null || echo "0") | |
| # Count movement commands (x > 0) | |
| MOVING=$(grep -c '"x": 0\.[0-9]' cmd_vel_test.txt 2>/dev/null || echo "0") | |
| # Count stop commands (x = 0) | |
| STOPPED=$(grep -c '"x": 0\.0' cmd_vel_test.txt 2>/dev/null || echo "0") | |
| echo "Total commands: $TOTAL" | |
| echo "Movement commands: $MOVING" | |
| echo "Stop commands: $STOPPED" | |
| if [ "$TOTAL" -eq 0 ]; then | |
| echo "❌ PROBLEM: No commands recorded" | |
| echo " Possible issues:" | |
| echo " - Gazebo not running" | |
| echo " - Robot not spawned" | |
| echo " - ROS topics not available" | |
| elif [ "$MOVING" -gt 0 ]; then | |
| echo "✅ SUCCESS: Robot IS moving!" | |
| echo "" | |
| echo "Sample movement values:" | |
| grep '"x": 0\.[0-9]' cmd_vel_test.txt 2>/dev/null | head -3 | sed 's/^/ /' | |
| # Check for turns | |
| TURNS=$(grep -c '"z": 0\.[0-9]\|"z": -0\.[0-9]' cmd_vel_test.txt 2>/dev/null || echo "0") | |
| echo "Turn commands: $TURNS" | |
| # Calculate percentages | |
| if [ "$TOTAL" -gt 0 ]; then | |
| MOVE_PERCENT=$((MOVING * 100 / TOTAL)) | |
| STOP_PERCENT=$((STOPPED * 100 / TOTAL)) | |
| echo "Movement percentage: $MOVE_PERCENT%" | |
| echo "Stop percentage: $STOP_PERCENT%" | |
| fi | |
| else | |
| echo "❌ PROBLEM: Robot NOT moving" | |
| echo "All commands:" | |
| head -5 cmd_vel_test.txt 2>/dev/null | sed 's/^/ /' | |
| echo "" | |
| echo "Check node output:" | |
| tail -5 node_output.log 2>/dev/null | |
| fi | |
| else | |
| echo "❌ No data recorded in cmd_vel_test.txt" | |
| echo "Quick diagnostic:" | |
| echo " Testing /cmd_vel directly..." | |
| timeout 2 ros2 topic echo /cmd_vel --once 2>&1 | head -3 | |
| fi | |
| echo "" | |
| echo "=== FILES CREATED ===" | |
| ls -la | |
| echo "" | |
| echo "Data saved in: $TEST_DIR" | |
| echo "" | |
| echo "📸 Take screenshot of:" | |
| echo "1. This output (especially 'SUCCESS: Robot IS moving!')" | |
| echo "2. Gazebo window showing robot movement" | |
| echo "3. The files listed above" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment