Skip to content

Instantly share code, notes, and snippets.

@moooeeeep
Created September 9, 2025 08:45
Show Gist options
  • Select an option

  • Save moooeeeep/061c9d4d35475364af09f70e8ac2c174 to your computer and use it in GitHub Desktop.

Select an option

Save moooeeeep/061c9d4d35475364af09f70e8ac2c174 to your computer and use it in GitHub Desktop.
Formatted dump of topic data (yaml or csv) from ROS 2 bagfile
#! /usr/bin/env python3
import argparse
import sys
import yaml
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import rosbag2_py
def export_dict(msg):
fields = {}
for field_name in msg.get_fields_and_field_types():
# process recursively
msg_field = getattr(msg, field_name)
if hasattr(msg_field, "get_fields_and_field_types"):
# handle nested messages
fields[field_name] = export_dict(msg_field)
elif isinstance(msg_field, (list, tuple)):
# handle lists
fields[field_name] = [export_dict(i) for i in msg_field]
else:
# handle simple fields
fields[field_name] = msg_field
return fields
def flatten_fields(d, namespace=None):
flat_fields = []
for field_name, field in d.items():
# flatten field keys
nested_field_name = (
f"{namespace}.{field_name}"
if namespace
else field_name
)
# process recursively
if isinstance(field, dict):
# handle nested messages
flat_fields += flatten_fields(field, nested_field_name)
elif isinstance(field, (list, tuple)):
# handle lists
for idx,i in enumerate(field):
flat_fields += flatten_fields(i, f"{nested_field_name}.{idx}")
else:
# handle simple fields
flat_fields += [{nested_field_name: field}]
return flat_fields
def main(args):
storage_id = args.storage
if storage_id == "auto":
# autodetect from file extension
if args.path.endswith(".db3"):
storage_id = "sqlite3"
elif args.path.endswith(".mcap"):
storage_id = "mcap"
else:
raise NotImplementedError("Unable to autodetect storage_id")
storage_options = rosbag2_py.StorageOptions(
uri=args.path,
storage_id=storage_id,
)
converter_options = rosbag2_py.ConverterOptions(
input_serialization_format=args.input_serialization_format,
output_serialization_format=args.output_serialization_format,
)
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)
storage_filter = rosbag2_py.StorageFilter(topics=[args.topic])
reader.set_filter(storage_filter)
type_map = {i.name: i.type for i in reader.get_all_topics_and_types()}
try:
msg_type = get_message(type_map[args.topic])
except KeyError:
print("topic not found", file=sys.stderr)
raise SystemExit(-1)
header = None
while reader.has_next():
topic, data, t = reader.read_next()
msg_deserialized = deserialize_message(data, msg_type)
exported = export_dict(msg_deserialized)
if not args.print_csv:
print(yaml.dump(exported))
else:
exported = flatten_fields(exported)
if header is None:
# print header as first output row
header = [list(i.keys())[0] for i in exported]
print(",".join(header))
print(",".join(str(list(i.values())[0]) for i in exported))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='dump topic data from bagfile',
)
parser.add_argument("-b", "--path", type=str, required=True)
parser.add_argument("--storage", type=str, default="auto")
parser.add_argument("--input-serialization-format", type=str, default="")
parser.add_argument("--output-serialization-format", type=str, default="")
parser.add_argument("-p", dest="print_csv", action='store_true', default=False)
parser.add_argument("topic", type=str)
args = parser.parse_args(sys.argv[1:])
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment