Created
September 9, 2025 08:45
-
-
Save moooeeeep/061c9d4d35475364af09f70e8ac2c174 to your computer and use it in GitHub Desktop.
Formatted dump of topic data (yaml or csv) from ROS 2 bagfile
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| import argparse | |
| import sys | |
| import yaml | |
| from rclpy.serialization import deserialize_message | |
| from rosidl_runtime_py.utilities import get_message | |
| import rosbag2_py | |
| def export_dict(msg): | |
| fields = {} | |
| for field_name in msg.get_fields_and_field_types(): | |
| # process recursively | |
| msg_field = getattr(msg, field_name) | |
| if hasattr(msg_field, "get_fields_and_field_types"): | |
| # handle nested messages | |
| fields[field_name] = export_dict(msg_field) | |
| elif isinstance(msg_field, (list, tuple)): | |
| # handle lists | |
| fields[field_name] = [export_dict(i) for i in msg_field] | |
| else: | |
| # handle simple fields | |
| fields[field_name] = msg_field | |
| return fields | |
| def flatten_fields(d, namespace=None): | |
| flat_fields = [] | |
| for field_name, field in d.items(): | |
| # flatten field keys | |
| nested_field_name = ( | |
| f"{namespace}.{field_name}" | |
| if namespace | |
| else field_name | |
| ) | |
| # process recursively | |
| if isinstance(field, dict): | |
| # handle nested messages | |
| flat_fields += flatten_fields(field, nested_field_name) | |
| elif isinstance(field, (list, tuple)): | |
| # handle lists | |
| for idx,i in enumerate(field): | |
| flat_fields += flatten_fields(i, f"{nested_field_name}.{idx}") | |
| else: | |
| # handle simple fields | |
| flat_fields += [{nested_field_name: field}] | |
| return flat_fields | |
| def main(args): | |
| storage_id = args.storage | |
| if storage_id == "auto": | |
| # autodetect from file extension | |
| if args.path.endswith(".db3"): | |
| storage_id = "sqlite3" | |
| elif args.path.endswith(".mcap"): | |
| storage_id = "mcap" | |
| else: | |
| raise NotImplementedError("Unable to autodetect storage_id") | |
| storage_options = rosbag2_py.StorageOptions( | |
| uri=args.path, | |
| storage_id=storage_id, | |
| ) | |
| converter_options = rosbag2_py.ConverterOptions( | |
| input_serialization_format=args.input_serialization_format, | |
| output_serialization_format=args.output_serialization_format, | |
| ) | |
| reader = rosbag2_py.SequentialReader() | |
| reader.open(storage_options, converter_options) | |
| storage_filter = rosbag2_py.StorageFilter(topics=[args.topic]) | |
| reader.set_filter(storage_filter) | |
| type_map = {i.name: i.type for i in reader.get_all_topics_and_types()} | |
| try: | |
| msg_type = get_message(type_map[args.topic]) | |
| except KeyError: | |
| print("topic not found", file=sys.stderr) | |
| raise SystemExit(-1) | |
| header = None | |
| while reader.has_next(): | |
| topic, data, t = reader.read_next() | |
| msg_deserialized = deserialize_message(data, msg_type) | |
| exported = export_dict(msg_deserialized) | |
| if not args.print_csv: | |
| print(yaml.dump(exported)) | |
| else: | |
| exported = flatten_fields(exported) | |
| if header is None: | |
| # print header as first output row | |
| header = [list(i.keys())[0] for i in exported] | |
| print(",".join(header)) | |
| print(",".join(str(list(i.values())[0]) for i in exported)) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description='dump topic data from bagfile', | |
| ) | |
| parser.add_argument("-b", "--path", type=str, required=True) | |
| parser.add_argument("--storage", type=str, default="auto") | |
| parser.add_argument("--input-serialization-format", type=str, default="") | |
| parser.add_argument("--output-serialization-format", type=str, default="") | |
| parser.add_argument("-p", dest="print_csv", action='store_true', default=False) | |
| parser.add_argument("topic", type=str) | |
| args = parser.parse_args(sys.argv[1:]) | |
| main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment