Created
January 8, 2026 09:59
-
-
Save szobov/0aca7236322328b0947d9abbebcc74c2 to your computer and use it in GitHub Desktop.
Reproducer for rclpy issue #1455
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Reproducer for rclpy issue #1455: | |
| "Failed to get number of ready entities for action client: | |
| wait set index for status subscription is out of bounds" | |
| GitHub Issue: https://github.com/ros2/rclpy/issues/1455 | |
| ROOT CAUSE: | |
| The race condition occurs when multiple threads create different wait_sets: | |
| 1. Thread A calls add_to_wait_set(wait_set_A) -> stores indices in action_client | |
| 2. Thread B creates wait_set_B with different entity count | |
| 3. is_ready(wait_set_B) uses cached indices from wait_set_A -> "out of bounds" | |
| USAGE: | |
| python3 reproduce_wait_set_bug.py | |
| The bug typically reproduces within 1-3 seconds. | |
| """ | |
| import contextlib | |
| import random | |
| import threading | |
| import time | |
| import traceback | |
| import rclpy | |
| from example_interfaces.action import Fibonacci | |
| from rclpy.action.client import ActionClient | |
| from rclpy.action.server import ActionServer | |
| from rclpy.callback_groups import ReentrantCallbackGroup | |
| from rclpy.executors import MultiThreadedExecutor | |
| from rclpy.node import Node | |
| # Global state for bug detection | |
| error_detected = threading.Event() | |
| def check_bug(e: Exception, location: str) -> bool: | |
| error_msg = str(e).lower() | |
| if "wait set index" in error_msg or "out of bounds" in error_msg: | |
| print(f"\n{'=' * 70}") | |
| print(f"BUG REPRODUCED at {location}!") | |
| print(f"{'=' * 70}") | |
| print(f"Exception: {e}") | |
| print(f"Traceback:\n{traceback.format_exc()}") | |
| print(f"{'=' * 70}\n") | |
| error_detected.set() | |
| return True | |
| return False | |
| class ActionServerNode(Node): | |
| def __init__(self) -> None: | |
| super().__init__("action_server_node") | |
| self._action_server = ActionServer( | |
| self, | |
| Fibonacci, | |
| "fibonacci", | |
| self.execute_callback, | |
| callback_group=ReentrantCallbackGroup(), | |
| ) | |
| def execute_callback(self, goal_handle): | |
| feedback_msg = Fibonacci.Feedback() | |
| feedback_msg.sequence = [0, 1] | |
| for _ in range(min(goal_handle.request.order, 3)): | |
| feedback_msg.sequence.append( | |
| feedback_msg.sequence[-1] + feedback_msg.sequence[-2] | |
| ) | |
| with contextlib.suppress(Exception): | |
| goal_handle.publish_feedback(feedback_msg) | |
| time.sleep(random.uniform(0.0001, 0.001)) | |
| goal_handle.succeed() | |
| result = Fibonacci.Result() | |
| result.sequence = feedback_msg.sequence | |
| return result | |
| class ClientNode(Node): | |
| def __init__(self, node_id: int) -> None: | |
| super().__init__(f"client_{node_id}") | |
| self.callback_group = ReentrantCallbackGroup() | |
| self._action_client = ActionClient( | |
| self, Fibonacci, "fibonacci", callback_group=self.callback_group | |
| ) | |
| # Add varying subscriptions to create different wait_set sizes per node | |
| for i in range(node_id + 1): | |
| self.create_subscription( | |
| Fibonacci.Feedback, | |
| f"/dummy_{node_id}_{i}", | |
| lambda _: None, | |
| 10, | |
| callback_group=self.callback_group, | |
| ) | |
| self.create_timer(0.01, self.send_goal, callback_group=self.callback_group) | |
| self.pending = 0 | |
| def send_goal(self) -> None: | |
| if error_detected.is_set() or self.pending >= 5: | |
| return | |
| try: | |
| if not self._action_client.server_is_ready(): | |
| return | |
| except Exception as e: | |
| check_bug(e, "send_goal:server_is_ready") | |
| return | |
| goal = Fibonacci.Goal() | |
| goal.order = random.randint(2, 5) | |
| self.pending += 1 | |
| try: | |
| future = self._action_client.send_goal_async(goal) | |
| future.add_done_callback(self.goal_response_cb) | |
| except Exception as e: | |
| self.pending -= 1 | |
| check_bug(e, "send_goal:send_goal_async") | |
| def goal_response_cb(self, future) -> None: | |
| if error_detected.is_set(): | |
| self.pending -= 1 | |
| return | |
| try: | |
| goal_handle = future.result() | |
| if goal_handle.accepted: | |
| result_future = goal_handle.get_result_async() | |
| result_future.add_done_callback(lambda _: self._decrement_pending()) | |
| else: | |
| self.pending -= 1 | |
| except Exception as e: | |
| self.pending -= 1 | |
| check_bug(e, "goal_response_cb") | |
| def _decrement_pending(self) -> None: | |
| self.pending -= 1 | |
| class SpinFutureThread(threading.Thread): | |
| def __init__(self, executor: MultiThreadedExecutor, node: Node, name: str) -> None: | |
| super().__init__(name=name, daemon=True) | |
| self.executor = executor | |
| self.node = node | |
| self.running = True | |
| def run(self) -> None: | |
| client = ActionClient( | |
| self.node, Fibonacci, "fibonacci", callback_group=ReentrantCallbackGroup() | |
| ) | |
| while self.running and not error_detected.is_set(): | |
| try: | |
| if client.server_is_ready(): | |
| break | |
| except Exception as e: | |
| if check_bug(e, f"{self.name}:server_is_ready"): | |
| return | |
| time.sleep(0.1) | |
| while self.running and not error_detected.is_set(): | |
| goal = Fibonacci.Goal() | |
| goal.order = random.randint(2, 4) | |
| try: | |
| future = client.send_goal_async(goal) | |
| # This creates a competing wait_set in a different thread | |
| self.executor.spin_until_future_complete(future, timeout_sec=1.0) | |
| if future.done() and not future.cancelled(): | |
| goal_handle = future.result() | |
| if goal_handle and goal_handle.accepted: | |
| result_future = goal_handle.get_result_async() | |
| self.executor.spin_until_future_complete( | |
| result_future, timeout_sec=2.0 | |
| ) | |
| except Exception as e: | |
| if check_bug(e, f"{self.name}:spin_until_future_complete"): | |
| return | |
| time.sleep(0.001) | |
| def stop(self) -> None: | |
| self.running = False | |
| class SpinThread(threading.Thread): | |
| def __init__(self, executor: MultiThreadedExecutor, name: str) -> None: | |
| super().__init__(name=name, daemon=True) | |
| self.executor = executor | |
| self.running = True | |
| def run(self) -> None: | |
| while self.running and not error_detected.is_set(): | |
| try: | |
| self.executor.spin_once(timeout_sec=0.001) | |
| except Exception as e: | |
| if check_bug(e, f"{self.name}:spin_once"): | |
| return | |
| def stop(self) -> None: | |
| self.running = False | |
| def main() -> None: | |
| rclpy.init() | |
| executor: MultiThreadedExecutor | None = None | |
| try: | |
| server_node = ActionServerNode() | |
| client_nodes = [ClientNode(i) for i in range(5)] | |
| executor = MultiThreadedExecutor(num_threads=12) | |
| executor.add_node(server_node) | |
| for node in client_nodes: | |
| executor.add_node(node) | |
| # Threads calling spin_until_future_complete (key to triggering bug) | |
| spin_future_threads = [ | |
| SpinFutureThread(executor, node, f"SpinFuture-{i}") | |
| for i, node in enumerate(client_nodes[:3]) | |
| ] | |
| for t in spin_future_threads: | |
| t.start() | |
| # Additional concurrent spin threads | |
| spin_threads = [SpinThread(executor, f"Spin-{i}") for i in range(4)] | |
| for t in spin_threads: | |
| t.start() | |
| # Main thread spin loop | |
| start_time = time.time() | |
| while not error_detected.is_set(): | |
| try: | |
| executor.spin_once(timeout_sec=0.001) | |
| except KeyboardInterrupt: | |
| print("\nStopped by user") | |
| break | |
| except Exception as e: | |
| if check_bug(e, "main:spin_once"): | |
| break | |
| for t in spin_future_threads + spin_threads: | |
| t.stop() | |
| for t in spin_future_threads + spin_threads: | |
| t.join(timeout=1.0) | |
| if error_detected.is_set(): | |
| print(f"\nSUCCESS: Bug reproduced in {time.time() - start_time:.1f}s") | |
| else: | |
| print(f"\nBug not reproduced after {time.time() - start_time:.1f}s") | |
| finally: | |
| if executor is not None: | |
| with contextlib.suppress(Exception): | |
| executor.shutdown() | |
| rclpy.shutdown() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment