Created
May 14, 2024 03:36
-
-
Save vyeevani/6046611c698b5c00d8aacc93b0b20f2e to your computer and use it in GitHub Desktop.
collection of util functions for tensor flow datasets specifically to deal with robot trajectories including down stepping and other such nonsense
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import glob | |
| import re | |
| import tensorflow as tf | |
| import json | |
| import numpy as np | |
| from PIL import Image | |
| def make_json_parser(json_file, image_dir, image_size, greyscale_images=False): | |
| def json_parser(): | |
| with open(json_file, 'r') as file: | |
| data = json.load(file) | |
| for timestep in data['timesteps']: | |
| image_path = os.path.join(image_dir, timestep['image_name']) | |
| image = Image.open(image_path) | |
| if greyscale_images: | |
| image = image.convert('L') | |
| image = np.array(image.resize(image_size)) | |
| if greyscale_images: | |
| image = np.expand_dims(image, axis=0) | |
| image = image.reshape((*image_size, 1)) | |
| current_pose = np.array(timestep['current_pose']['data']) | |
| desired_pose = np.array(timestep['desired_pose']['data']) | |
| current_gripper = np.array(timestep['current_gripper_position']) | |
| desired_gripper = np.array(timestep['desired_gripper_position']) | |
| yield (image, current_pose, desired_pose, current_gripper, desired_gripper) | |
| return json_parser | |
| def make_timestep_dataset(json_parser): | |
| return tf.data.Dataset.from_generator( | |
| json_parser, | |
| output_signature=( | |
| tf.TensorSpec(shape=(None, None, None), dtype=tf.uint8), # image | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose | |
| tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position | |
| tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position | |
| ) | |
| ) | |
| def make_json_parser_2(json_file, image_dir, image_size, greyscale_images=False): | |
| def json_parser(): | |
| with open(json_file, 'r') as file: | |
| data = json.load(file) | |
| for timestep in data['timesteps']: | |
| image_path = os.path.join(image_dir, timestep['image_name']) | |
| current_pose = np.array(timestep['current_pose']['data']) | |
| desired_pose = np.array(timestep['desired_pose']['data']) | |
| current_gripper = np.array(timestep['current_gripper_position']) | |
| desired_gripper = np.array(timestep['desired_gripper_position']) | |
| yield (image_path, current_pose, desired_pose, current_gripper, desired_gripper) | |
| return json_parser | |
| def make_timestep_dataset_2(json_parser): | |
| return tf.data.Dataset.from_generator( | |
| json_parser, | |
| output_signature=( | |
| tf.TensorSpec(shape=(), dtype=tf.string), # image | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose | |
| tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position | |
| tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position | |
| ) | |
| ) | |
| def make_json_parser_3(json_file, image_dir, image_size, greyscale_images=False): | |
| def json_parser(): | |
| with open(json_file, 'r') as file: | |
| data = json.load(file) | |
| for timestep in data['timesteps']: | |
| trajectory_id = os.path.basename(json_file) | |
| image_path = os.path.join(image_dir, timestep['image_name']) | |
| current_pose = np.array(timestep['current_pose']['data']) | |
| desired_pose = np.array(timestep['desired_pose']['data']) | |
| current_gripper = np.array(timestep['current_gripper_position']) | |
| desired_gripper = np.array(timestep['desired_gripper_position']) | |
| yield (trajectory_id, image_path, current_pose, desired_pose, current_gripper, desired_gripper) | |
| return json_parser | |
| def make_timestep_dataset_3(json_parser): | |
| return tf.data.Dataset.from_generator( | |
| json_parser, | |
| output_signature=( | |
| tf.TensorSpec(shape=(), dtype=tf.string), # id | |
| tf.TensorSpec(shape=(), dtype=tf.string), # image | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose | |
| tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position | |
| tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position | |
| ) | |
| ) | |
| def make_trajectory_dataset(timestep_datasets): | |
| return tf.data.Dataset.from_tensor_slices(timestep_datasets) | |
| def flatten_nested_dataset(dataset): | |
| return dataset.flat_map(lambda x: x) | |
| def get_files(directory): | |
| if not os.path.isdir(directory): | |
| raise ValueError(f"{directory} is not a valid directory.") | |
| search_path = os.path.join(directory, "*.json") | |
| files = glob.glob(search_path) | |
| return files | |
| # expects a dict with element field_index: {"mean": mean, "std": std,} | |
| def make_normalize(normalization_params, normalization_fields): | |
| @tf.function | |
| def normalize(value, mean, std): | |
| cond = tf.less_equal(1e-3, std) | |
| return tf.where(cond, (value - mean) / std, 0., ) | |
| @tf.function | |
| def normalize_fields(*args): | |
| return [ | |
| field | |
| if i not in normalization_fields | |
| else ( | |
| normalize( | |
| tf.cast( | |
| field, | |
| normalization_params[i]["mean"].dtype | |
| ), | |
| normalization_params[i]["mean"], | |
| normalization_params[i]["std"] | |
| ) | |
| ) for (i, field) in enumerate(args)] | |
| return normalize_fields | |
| def make_lookahead_dataset(timestep_dataset, tuple_count, lookahead_steps, lookahead_fields): | |
| def make_split(index): | |
| @tf.function | |
| def split(*args): | |
| return args[index] | |
| return split | |
| split_functions = [make_split(i) for i in range(tuple_count)] | |
| return tf.data.Dataset.zip(*[ | |
| timestep_dataset.map(split_function).window(1, shift=lookahead_steps).flat_map(lambda x: x) if i not in lookahead_fields | |
| else timestep_dataset.map(split_function).window(lookahead_steps, shift=lookahead_steps, stride=1, drop_remainder=True).flat_map(lambda window: window.batch(lookahead_steps)) | |
| for (i, split_function) in enumerate(split_functions) | |
| ]) | |
| def make_lookahead_dataset(timestep_dataset, tuple_count, lookahead_steps, lookahead_fields): | |
| def make_split(index): | |
| @tf.function | |
| def split(*args): | |
| return args[index] | |
| return split | |
| split_functions = [make_split(i) for i in range(tuple_count)] | |
| return tf.data.Dataset.zip(*[ | |
| timestep_dataset.map(split_function).window(1, shift=1).flat_map(lambda x: x) if i not in lookahead_fields | |
| else timestep_dataset.map(split_function).window(lookahead_steps, shift=1, stride=1, drop_remainder=True).flat_map(lambda window: window.batch(lookahead_steps)) | |
| for (i, split_function) in enumerate(split_functions) | |
| ]) | |
| def make_lookbehind_dataset(timestep_dataset, tuple_count, lookbehind_steps, lookbehind_fields): | |
| first_element = timestep_dataset.take(1).repeat(lookbehind_steps - 1) | |
| timestep_dataset = first_element.concatenate(timestep_dataset) | |
| return make_lookahead_dataset(timestep_dataset, tuple_count, lookbehind_steps, lookbehind_fields) | |
| def make_downsampled_dataset(dataset, skip_count, obs_fields): | |
| """ | |
| This function will downsample the dataset such that for a given sequence of length skip count, | |
| the sequence would be reduced to a sample of length 1 with the obs from sample 0, and actions from sample at index | |
| skip count - 1 | |
| """ | |
| @tf.function | |
| def downsample(*args): | |
| return tf.data.Dataset.zip(*[ | |
| arg.batch(skip_count).map(lambda x: x[0]) | |
| if i in obs_fields | |
| else arg.batch(skip_count).map(lambda x: x[-1]) | |
| for (i, arg) in enumerate(args) | |
| ]) | |
| dataset = dataset.window(skip_count, skip_count, drop_remainder=True) | |
| return dataset.flat_map(downsample) | |
| # generally, this should go from 0.001 -> 0.02 | |
| # For the robot that I have, I'm going to start with beta_start=0.000001 | |
| # given the general sensitivity of the robot itself | |
| def make_diffusion_dataset(dataset, num_steps, diffusion_fields, beta_start=0.000001, beta_interval=0.000001): | |
| beta_end=beta_start + beta_interval * num_steps | |
| betas = tf.linspace(beta_start, beta_end, num_steps - 1) | |
| @tf.function | |
| def diffusion_process(x): | |
| """ | |
| Applies a diffusion process to a data point x over num_steps with a varying beta schedule | |
| for the variance in a TensorFlow-compatible manner. | |
| Parameters: | |
| - x: TensorFlow tensor, the initial data point. | |
| - num_steps: int, the number of diffusion steps. | |
| - beta_start: float, the starting value of beta. | |
| - beta_end: float, the ending value of beta. | |
| Returns: | |
| - A TensorFlow tensor of shape (num_steps, *x.shape) where each step is the result | |
| of a diffusion step applied to x, compatible with symbolic tensors. | |
| """ | |
| # Initialize a list to store each diffusion step. | |
| diffusion_steps = [x] | |
| for i in range(1, num_steps): | |
| # Calculate the standard deviation for the current step | |
| sigma = tf.sqrt(betas[i - 1]) | |
| noise = tf.random.normal(shape=tf.shape(x), mean=0.0, stddev=sigma) | |
| x_step = diffusion_steps[-1] + noise | |
| diffusion_steps.append(x_step) | |
| diffusion_steps.reverse() | |
| # Stack the diffusion steps to form a tensor of shape (num_steps, *x.shape) | |
| diffusion_steps_tensor = tf.stack(diffusion_steps) | |
| return diffusion_steps_tensor | |
| @tf.function | |
| def diffusion(*args): | |
| return [diffusion_process(arg) if (i in diffusion_fields) else arg for (i, arg) in enumerate(args)] | |
| return dataset.map(diffusion) | |
| def make_diffusion_dataset_flattened(dataset, diffusion_steps, diffusion_fields): | |
| @tf.function | |
| def diffusion(*args): | |
| return [diffusion_process(arg, num_steps=diffusion_steps) if (i in diffusion_fields) else tf.stack([arg] * diffusion_steps) for (i, arg) in enumerate(args)] | |
| return dataset.map(diffusion).unbatch() | |
| import unittest | |
| class TestDataset(unittest.TestCase): | |
| @staticmethod | |
| def create_synthetic_timestep_dataset(size, image_size): | |
| """ | |
| Create a synthetic timestamp dataset of a certain number of values | |
| """ | |
| values = [] | |
| for i in range(size): | |
| image = np.random.rand(*image_size, 3) | |
| current_pose = np.random.rand(4, 4) | |
| desired_pose = np.random.rand(4, 4) | |
| current_gripper = np.array(0) | |
| desired_gripper = np.array(0) | |
| values.append((image, current_pose, desired_pose, current_gripper, desired_gripper)) | |
| def generator(): | |
| for value in values: | |
| yield value | |
| return tf.data.Dataset.from_generator( | |
| generator, | |
| output_signature=( | |
| tf.TensorSpec(shape=(None, None, None), dtype=tf.uint8), | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), | |
| tf.TensorSpec(shape=(4, 4), dtype=tf.float32), | |
| tf.TensorSpec(shape=(), dtype=tf.float32), | |
| tf.TensorSpec(shape=(), dtype=tf.float32) | |
| ) | |
| ) | |
| @classmethod | |
| def get_test_dir(cls): | |
| current_file_path = os.path.realpath(__file__) | |
| base_dir = os.path.dirname(current_file_path) | |
| return os.path.join(base_dir, "test_data") | |
| @classmethod | |
| def get_trajectory_files(cls, data_dir): | |
| data_dir = os.path.join(data_dir, "trajectories") | |
| json_files = get_files(data_dir) | |
| return json_files | |
| @classmethod | |
| def get_test_trajectory_files(cls): | |
| return cls.get_trajectory_files(cls.get_test_dir()) | |
| @classmethod | |
| def get_image_dir(cls, data_dir): | |
| return os.path.join(data_dir, "images") | |
| @classmethod | |
| def get_test_image_dir(cls): | |
| return cls.get_image_dir(cls.get_test_dir()) | |
| def check_timesteps_shape(self, timestep_dataset): | |
| count = 0 | |
| for image, current_pose, desired_pose, current_gripper, desired_gripper in timestep_dataset: | |
| count += 1 | |
| self.assertEqual(image.shape, (100, 100, 3), "Image should have shape (100, 100, 3)") | |
| self.assertEqual(current_pose.shape, (4, 4), "Current pose should have shape (4, 4)") | |
| self.assertEqual(desired_pose.shape, (4, 4), "Desired pose should have shape (4, 4)") | |
| self.assertGreater(count, 0) | |
| def check_lookahead_timestep_shape(self, lookahead_count, timestep_dataset): | |
| count = 0 | |
| for image, current_pose, desired_pose, current_gripper, desired_gripper in timestep_dataset: | |
| count += 1 | |
| self.assertEqual(image.shape, (100, 100, 3), "Image should have shape (100, 100, 3)") | |
| self.assertEqual(current_pose.shape, (4, 4), "Current pose should have shape (4, 4)") | |
| self.assertEqual(desired_pose.shape, (lookahead_count, 4, 4), "Desired pose should have shape (4, 4)") | |
| self.assertGreater(count, 0) | |
| # def test_greyscale_image_generation(self): | |
| # json_files = self.get_test_trajectory_files() | |
| # self.assertTrue(len(json_files) > 0, "There should be at least one trajectory file") | |
| # trajectory_dataset = flatten_nested_dataset( | |
| # make_trajectory_dataset([ | |
| # make_timestep_dataset(make_json_parser(json_file, self.get_test_image_dir(), (100, 100), greyscale_images=True)) | |
| # for json_file in json_files | |
| # ]) | |
| # ) | |
| # for image, _, _, _, _ in trajectory_dataset: | |
| # self.assertEqual(image.shape[-1], 1, f"Image should be greyscale. found shape {image.shape}") | |
| # def test_json_dataset(self): | |
| # json_files = self.get_test_trajectory_files() | |
| # self.assertTrue(len(json_files) > 0, "There should be at least one trajectory file") | |
| # trajectory_dataset = flatten_nested_dataset(make_trajectory_dataset([make_timestep_dataset(make_json_parser(json_file, self.get_test_image_dir(), (100, 100))) for json_file in json_files])) | |
| # self.check_timesteps_shape(trajectory_dataset) | |
| # def test_synthetic_timestep_dataset(self): | |
| # synthetic_dataset = self.create_synthetic_timestep_dataset(5, (100, 100)) | |
| # self.check_timesteps_shape(synthetic_dataset) | |
| # def test_make_lookahead_dataset(self): | |
| # dataset_length = 5 | |
| # lookahead_steps = 2 | |
| # single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100)) | |
| # single_timestep_data = [] | |
| # for data in single_timestep_dataset: | |
| # single_timestep_data.append(data) | |
| # future_dataset = make_lookahead_dataset(single_timestep_dataset, lookahead_steps=lookahead_steps, lookahead_fields=[2]) | |
| # loop_count = 0 | |
| # self.check_lookahead_timestep_shape(lookahead_steps, future_dataset) | |
| # for i, (future_image, future_current_pose, future_desired_poses, future_current_gripper, future_desired_gripper) in enumerate(future_dataset): | |
| # image = single_timestep_data[i][0] | |
| # current_pose = single_timestep_data[i][1] | |
| # desired_poses = np.array([single_timestep_data[i + j][2] for j in range(lookahead_steps)]) | |
| # self.assertTrue(np.array_equal(future_image, image)) | |
| # self.assertTrue(np.array_equal(future_current_pose, current_pose)) | |
| # self.assertTrue(np.array_equal(future_desired_poses, desired_poses)) | |
| # loop_count += 1 | |
| # self.assertEqual(loop_count, dataset_length - lookahead_steps + 1) | |
| # def test_make_diffusion_dataset(self): | |
| # dataset_length = 5 | |
| # diffusion_steps = 10 | |
| # single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100)) | |
| # diffusion_dataset = make_diffusion_dataset(single_timestep_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2]) | |
| # loop_count = 0 | |
| # for _, _, diffusion_desired_poses, _, _ in diffusion_dataset: | |
| # self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, 4, 4)) | |
| # loop_count += 1 | |
| # self.assertEqual(loop_count, dataset_length) | |
| def test_make_nested_diffusion_dataset(self): | |
| dataset_length = 5 | |
| diffusion_steps = 10 | |
| single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100)) | |
| diffusion_dataset = make_diffusion_nested_dataset(single_timestep_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2]) | |
| # loop_count = 0 | |
| # for i in diffusion_dataset: | |
| # print(i) | |
| # self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, 4, 4)) | |
| # loop_count += 1 | |
| # self.assertEqual(loop_count, dataset_length) | |
| # def test_make_lookahead_diffusion_dataset(self): | |
| # dataset_length = 5 | |
| # lookahead_steps = 2 | |
| # diffusion_steps = 10 | |
| # single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100)) | |
| # lookahead_dataset = make_lookahead_dataset(single_timestep_dataset, lookahead_steps=lookahead_steps, lookahead_fields=[2]) | |
| # diffusion_dataset = make_diffusion_dataset(lookahead_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2]) | |
| # loop_count = 0 | |
| # for _, _, diffusion_desired_poses, _, _ in diffusion_dataset: | |
| # self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, lookahead_steps, 4, 4)) | |
| # loop_count += 1 | |
| # self.assertEqual(loop_count, dataset_length - lookahead_steps + 1) | |
| # def test_generate_trajectory_unflattened_dataset(self): | |
| # single_timestep_datasets = [self.create_synthetic_timestep_dataset(5, (100, 100)) for _ in range(5)] | |
| # future_steps = 2 | |
| # future_datasets = [make_lookahead_dataset(single_timestep_dataset, lookahead_steps=future_steps, lookahead_fields=[2]) for single_timestep_dataset in single_timestep_datasets] | |
| # trajectory_dataset = make_trajectory_dataset(future_datasets) | |
| # for timesteps in trajectory_dataset: | |
| # self.check_lookahead_timestep_shape(future_steps, timesteps) | |
| # def test_generate_trajectory_flattened_dataset(self): | |
| # single_timestep_datasets = [self.create_synthetic_timestep_dataset(5, (100, 100)) for _ in range(5)] | |
| # future_steps = 2 | |
| # future_datasets = [make_lookahead_dataset(single_timestep_dataset, lookahead_steps=future_steps, lookahead_fields=[2]) for single_timestep_dataset in single_timestep_datasets] | |
| # trajectory_dataset = make_trajectory_dataset(future_datasets) | |
| # flattened_dataset = flatten_nested_dataset(trajectory_dataset) | |
| # self.check_lookahead_timestep_shape(future_steps, flattened_dataset) | |
| if __name__ == '__main__': | |
| unittest.main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this stuff really needs to get cleaned up but most of it has been tested. I've been extensively using the *_3 stuff for my current trajectory modeling stuff