Skip to content

Instantly share code, notes, and snippets.

@szobov
Created January 8, 2026 09:59
Show Gist options
  • Select an option

  • Save szobov/0aca7236322328b0947d9abbebcc74c2 to your computer and use it in GitHub Desktop.

Select an option

Save szobov/0aca7236322328b0947d9abbebcc74c2 to your computer and use it in GitHub Desktop.
Reproducer for rclpy issue #1455
"""
Reproducer for rclpy issue #1455:
"Failed to get number of ready entities for action client:
wait set index for status subscription is out of bounds"
GitHub Issue: https://github.com/ros2/rclpy/issues/1455
ROOT CAUSE:
The race condition occurs when multiple threads create different wait_sets:
1. Thread A calls add_to_wait_set(wait_set_A) -> stores indices in action_client
2. Thread B creates wait_set_B with different entity count
3. is_ready(wait_set_B) uses cached indices from wait_set_A -> "out of bounds"
USAGE:
python3 reproduce_wait_set_bug.py
The bug typically reproduces within 1-3 seconds.
"""
import contextlib
import random
import threading
import time
import traceback
import rclpy
from example_interfaces.action import Fibonacci
from rclpy.action.client import ActionClient
from rclpy.action.server import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
# Global state for bug detection
error_detected = threading.Event()
def check_bug(e: Exception, location: str) -> bool:
error_msg = str(e).lower()
if "wait set index" in error_msg or "out of bounds" in error_msg:
print(f"\n{'=' * 70}")
print(f"BUG REPRODUCED at {location}!")
print(f"{'=' * 70}")
print(f"Exception: {e}")
print(f"Traceback:\n{traceback.format_exc()}")
print(f"{'=' * 70}\n")
error_detected.set()
return True
return False
class ActionServerNode(Node):
def __init__(self) -> None:
super().__init__("action_server_node")
self._action_server = ActionServer(
self,
Fibonacci,
"fibonacci",
self.execute_callback,
callback_group=ReentrantCallbackGroup(),
)
def execute_callback(self, goal_handle):
feedback_msg = Fibonacci.Feedback()
feedback_msg.sequence = [0, 1]
for _ in range(min(goal_handle.request.order, 3)):
feedback_msg.sequence.append(
feedback_msg.sequence[-1] + feedback_msg.sequence[-2]
)
with contextlib.suppress(Exception):
goal_handle.publish_feedback(feedback_msg)
time.sleep(random.uniform(0.0001, 0.001))
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.sequence
return result
class ClientNode(Node):
def __init__(self, node_id: int) -> None:
super().__init__(f"client_{node_id}")
self.callback_group = ReentrantCallbackGroup()
self._action_client = ActionClient(
self, Fibonacci, "fibonacci", callback_group=self.callback_group
)
# Add varying subscriptions to create different wait_set sizes per node
for i in range(node_id + 1):
self.create_subscription(
Fibonacci.Feedback,
f"/dummy_{node_id}_{i}",
lambda _: None,
10,
callback_group=self.callback_group,
)
self.create_timer(0.01, self.send_goal, callback_group=self.callback_group)
self.pending = 0
def send_goal(self) -> None:
if error_detected.is_set() or self.pending >= 5:
return
try:
if not self._action_client.server_is_ready():
return
except Exception as e:
check_bug(e, "send_goal:server_is_ready")
return
goal = Fibonacci.Goal()
goal.order = random.randint(2, 5)
self.pending += 1
try:
future = self._action_client.send_goal_async(goal)
future.add_done_callback(self.goal_response_cb)
except Exception as e:
self.pending -= 1
check_bug(e, "send_goal:send_goal_async")
def goal_response_cb(self, future) -> None:
if error_detected.is_set():
self.pending -= 1
return
try:
goal_handle = future.result()
if goal_handle.accepted:
result_future = goal_handle.get_result_async()
result_future.add_done_callback(lambda _: self._decrement_pending())
else:
self.pending -= 1
except Exception as e:
self.pending -= 1
check_bug(e, "goal_response_cb")
def _decrement_pending(self) -> None:
self.pending -= 1
class SpinFutureThread(threading.Thread):
def __init__(self, executor: MultiThreadedExecutor, node: Node, name: str) -> None:
super().__init__(name=name, daemon=True)
self.executor = executor
self.node = node
self.running = True
def run(self) -> None:
client = ActionClient(
self.node, Fibonacci, "fibonacci", callback_group=ReentrantCallbackGroup()
)
while self.running and not error_detected.is_set():
try:
if client.server_is_ready():
break
except Exception as e:
if check_bug(e, f"{self.name}:server_is_ready"):
return
time.sleep(0.1)
while self.running and not error_detected.is_set():
goal = Fibonacci.Goal()
goal.order = random.randint(2, 4)
try:
future = client.send_goal_async(goal)
# This creates a competing wait_set in a different thread
self.executor.spin_until_future_complete(future, timeout_sec=1.0)
if future.done() and not future.cancelled():
goal_handle = future.result()
if goal_handle and goal_handle.accepted:
result_future = goal_handle.get_result_async()
self.executor.spin_until_future_complete(
result_future, timeout_sec=2.0
)
except Exception as e:
if check_bug(e, f"{self.name}:spin_until_future_complete"):
return
time.sleep(0.001)
def stop(self) -> None:
self.running = False
class SpinThread(threading.Thread):
def __init__(self, executor: MultiThreadedExecutor, name: str) -> None:
super().__init__(name=name, daemon=True)
self.executor = executor
self.running = True
def run(self) -> None:
while self.running and not error_detected.is_set():
try:
self.executor.spin_once(timeout_sec=0.001)
except Exception as e:
if check_bug(e, f"{self.name}:spin_once"):
return
def stop(self) -> None:
self.running = False
def main() -> None:
rclpy.init()
executor: MultiThreadedExecutor | None = None
try:
server_node = ActionServerNode()
client_nodes = [ClientNode(i) for i in range(5)]
executor = MultiThreadedExecutor(num_threads=12)
executor.add_node(server_node)
for node in client_nodes:
executor.add_node(node)
# Threads calling spin_until_future_complete (key to triggering bug)
spin_future_threads = [
SpinFutureThread(executor, node, f"SpinFuture-{i}")
for i, node in enumerate(client_nodes[:3])
]
for t in spin_future_threads:
t.start()
# Additional concurrent spin threads
spin_threads = [SpinThread(executor, f"Spin-{i}") for i in range(4)]
for t in spin_threads:
t.start()
# Main thread spin loop
start_time = time.time()
while not error_detected.is_set():
try:
executor.spin_once(timeout_sec=0.001)
except KeyboardInterrupt:
print("\nStopped by user")
break
except Exception as e:
if check_bug(e, "main:spin_once"):
break
for t in spin_future_threads + spin_threads:
t.stop()
for t in spin_future_threads + spin_threads:
t.join(timeout=1.0)
if error_detected.is_set():
print(f"\nSUCCESS: Bug reproduced in {time.time() - start_time:.1f}s")
else:
print(f"\nBug not reproduced after {time.time() - start_time:.1f}s")
finally:
if executor is not None:
with contextlib.suppress(Exception):
executor.shutdown()
rclpy.shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment