Skip to content

Instantly share code, notes, and snippets.

@vyeevani
Created May 14, 2024 03:36
Show Gist options
  • Select an option

  • Save vyeevani/6046611c698b5c00d8aacc93b0b20f2e to your computer and use it in GitHub Desktop.

Select an option

Save vyeevani/6046611c698b5c00d8aacc93b0b20f2e to your computer and use it in GitHub Desktop.
collection of util functions for tensor flow datasets specifically to deal with robot trajectories including down stepping and other such nonsense
import os
import glob
import re
import tensorflow as tf
import json
import numpy as np
from PIL import Image
def make_json_parser(json_file, image_dir, image_size, greyscale_images=False):
def json_parser():
with open(json_file, 'r') as file:
data = json.load(file)
for timestep in data['timesteps']:
image_path = os.path.join(image_dir, timestep['image_name'])
image = Image.open(image_path)
if greyscale_images:
image = image.convert('L')
image = np.array(image.resize(image_size))
if greyscale_images:
image = np.expand_dims(image, axis=0)
image = image.reshape((*image_size, 1))
current_pose = np.array(timestep['current_pose']['data'])
desired_pose = np.array(timestep['desired_pose']['data'])
current_gripper = np.array(timestep['current_gripper_position'])
desired_gripper = np.array(timestep['desired_gripper_position'])
yield (image, current_pose, desired_pose, current_gripper, desired_gripper)
return json_parser
def make_timestep_dataset(json_parser):
return tf.data.Dataset.from_generator(
json_parser,
output_signature=(
tf.TensorSpec(shape=(None, None, None), dtype=tf.uint8), # image
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose
tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position
tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position
)
)
def make_json_parser_2(json_file, image_dir, image_size, greyscale_images=False):
def json_parser():
with open(json_file, 'r') as file:
data = json.load(file)
for timestep in data['timesteps']:
image_path = os.path.join(image_dir, timestep['image_name'])
current_pose = np.array(timestep['current_pose']['data'])
desired_pose = np.array(timestep['desired_pose']['data'])
current_gripper = np.array(timestep['current_gripper_position'])
desired_gripper = np.array(timestep['desired_gripper_position'])
yield (image_path, current_pose, desired_pose, current_gripper, desired_gripper)
return json_parser
def make_timestep_dataset_2(json_parser):
return tf.data.Dataset.from_generator(
json_parser,
output_signature=(
tf.TensorSpec(shape=(), dtype=tf.string), # image
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose
tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position
tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position
)
)
def make_json_parser_3(json_file, image_dir, image_size, greyscale_images=False):
def json_parser():
with open(json_file, 'r') as file:
data = json.load(file)
for timestep in data['timesteps']:
trajectory_id = os.path.basename(json_file)
image_path = os.path.join(image_dir, timestep['image_name'])
current_pose = np.array(timestep['current_pose']['data'])
desired_pose = np.array(timestep['desired_pose']['data'])
current_gripper = np.array(timestep['current_gripper_position'])
desired_gripper = np.array(timestep['desired_gripper_position'])
yield (trajectory_id, image_path, current_pose, desired_pose, current_gripper, desired_gripper)
return json_parser
def make_timestep_dataset_3(json_parser):
return tf.data.Dataset.from_generator(
json_parser,
output_signature=(
tf.TensorSpec(shape=(), dtype=tf.string), # id
tf.TensorSpec(shape=(), dtype=tf.string), # image
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # current pose
tf.TensorSpec(shape=(4, 4), dtype=tf.float32), # desired pose
tf.TensorSpec(shape=(), dtype=tf.float32), # current gripper position
tf.TensorSpec(shape=(), dtype=tf.float32) # desired gripper position
)
)
def make_trajectory_dataset(timestep_datasets):
return tf.data.Dataset.from_tensor_slices(timestep_datasets)
def flatten_nested_dataset(dataset):
return dataset.flat_map(lambda x: x)
def get_files(directory):
if not os.path.isdir(directory):
raise ValueError(f"{directory} is not a valid directory.")
search_path = os.path.join(directory, "*.json")
files = glob.glob(search_path)
return files
# expects a dict with element field_index: {"mean": mean, "std": std,}
def make_normalize(normalization_params, normalization_fields):
@tf.function
def normalize(value, mean, std):
cond = tf.less_equal(1e-3, std)
return tf.where(cond, (value - mean) / std, 0., )
@tf.function
def normalize_fields(*args):
return [
field
if i not in normalization_fields
else (
normalize(
tf.cast(
field,
normalization_params[i]["mean"].dtype
),
normalization_params[i]["mean"],
normalization_params[i]["std"]
)
) for (i, field) in enumerate(args)]
return normalize_fields
def make_lookahead_dataset(timestep_dataset, tuple_count, lookahead_steps, lookahead_fields):
def make_split(index):
@tf.function
def split(*args):
return args[index]
return split
split_functions = [make_split(i) for i in range(tuple_count)]
return tf.data.Dataset.zip(*[
timestep_dataset.map(split_function).window(1, shift=lookahead_steps).flat_map(lambda x: x) if i not in lookahead_fields
else timestep_dataset.map(split_function).window(lookahead_steps, shift=lookahead_steps, stride=1, drop_remainder=True).flat_map(lambda window: window.batch(lookahead_steps))
for (i, split_function) in enumerate(split_functions)
])
def make_lookahead_dataset(timestep_dataset, tuple_count, lookahead_steps, lookahead_fields):
def make_split(index):
@tf.function
def split(*args):
return args[index]
return split
split_functions = [make_split(i) for i in range(tuple_count)]
return tf.data.Dataset.zip(*[
timestep_dataset.map(split_function).window(1, shift=1).flat_map(lambda x: x) if i not in lookahead_fields
else timestep_dataset.map(split_function).window(lookahead_steps, shift=1, stride=1, drop_remainder=True).flat_map(lambda window: window.batch(lookahead_steps))
for (i, split_function) in enumerate(split_functions)
])
def make_lookbehind_dataset(timestep_dataset, tuple_count, lookbehind_steps, lookbehind_fields):
first_element = timestep_dataset.take(1).repeat(lookbehind_steps - 1)
timestep_dataset = first_element.concatenate(timestep_dataset)
return make_lookahead_dataset(timestep_dataset, tuple_count, lookbehind_steps, lookbehind_fields)
def make_downsampled_dataset(dataset, skip_count, obs_fields):
"""
This function will downsample the dataset such that for a given sequence of length skip count,
the sequence would be reduced to a sample of length 1 with the obs from sample 0, and actions from sample at index
skip count - 1
"""
@tf.function
def downsample(*args):
return tf.data.Dataset.zip(*[
arg.batch(skip_count).map(lambda x: x[0])
if i in obs_fields
else arg.batch(skip_count).map(lambda x: x[-1])
for (i, arg) in enumerate(args)
])
dataset = dataset.window(skip_count, skip_count, drop_remainder=True)
return dataset.flat_map(downsample)
# generally, this should go from 0.001 -> 0.02
# For the robot that I have, I'm going to start with beta_start=0.000001
# given the general sensitivity of the robot itself
def make_diffusion_dataset(dataset, num_steps, diffusion_fields, beta_start=0.000001, beta_interval=0.000001):
beta_end=beta_start + beta_interval * num_steps
betas = tf.linspace(beta_start, beta_end, num_steps - 1)
@tf.function
def diffusion_process(x):
"""
Applies a diffusion process to a data point x over num_steps with a varying beta schedule
for the variance in a TensorFlow-compatible manner.
Parameters:
- x: TensorFlow tensor, the initial data point.
- num_steps: int, the number of diffusion steps.
- beta_start: float, the starting value of beta.
- beta_end: float, the ending value of beta.
Returns:
- A TensorFlow tensor of shape (num_steps, *x.shape) where each step is the result
of a diffusion step applied to x, compatible with symbolic tensors.
"""
# Initialize a list to store each diffusion step.
diffusion_steps = [x]
for i in range(1, num_steps):
# Calculate the standard deviation for the current step
sigma = tf.sqrt(betas[i - 1])
noise = tf.random.normal(shape=tf.shape(x), mean=0.0, stddev=sigma)
x_step = diffusion_steps[-1] + noise
diffusion_steps.append(x_step)
diffusion_steps.reverse()
# Stack the diffusion steps to form a tensor of shape (num_steps, *x.shape)
diffusion_steps_tensor = tf.stack(diffusion_steps)
return diffusion_steps_tensor
@tf.function
def diffusion(*args):
return [diffusion_process(arg) if (i in diffusion_fields) else arg for (i, arg) in enumerate(args)]
return dataset.map(diffusion)
def make_diffusion_dataset_flattened(dataset, diffusion_steps, diffusion_fields):
@tf.function
def diffusion(*args):
return [diffusion_process(arg, num_steps=diffusion_steps) if (i in diffusion_fields) else tf.stack([arg] * diffusion_steps) for (i, arg) in enumerate(args)]
return dataset.map(diffusion).unbatch()
import unittest
class TestDataset(unittest.TestCase):
@staticmethod
def create_synthetic_timestep_dataset(size, image_size):
"""
Create a synthetic timestamp dataset of a certain number of values
"""
values = []
for i in range(size):
image = np.random.rand(*image_size, 3)
current_pose = np.random.rand(4, 4)
desired_pose = np.random.rand(4, 4)
current_gripper = np.array(0)
desired_gripper = np.array(0)
values.append((image, current_pose, desired_pose, current_gripper, desired_gripper))
def generator():
for value in values:
yield value
return tf.data.Dataset.from_generator(
generator,
output_signature=(
tf.TensorSpec(shape=(None, None, None), dtype=tf.uint8),
tf.TensorSpec(shape=(4, 4), dtype=tf.float32),
tf.TensorSpec(shape=(4, 4), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.float32)
)
)
@classmethod
def get_test_dir(cls):
current_file_path = os.path.realpath(__file__)
base_dir = os.path.dirname(current_file_path)
return os.path.join(base_dir, "test_data")
@classmethod
def get_trajectory_files(cls, data_dir):
data_dir = os.path.join(data_dir, "trajectories")
json_files = get_files(data_dir)
return json_files
@classmethod
def get_test_trajectory_files(cls):
return cls.get_trajectory_files(cls.get_test_dir())
@classmethod
def get_image_dir(cls, data_dir):
return os.path.join(data_dir, "images")
@classmethod
def get_test_image_dir(cls):
return cls.get_image_dir(cls.get_test_dir())
def check_timesteps_shape(self, timestep_dataset):
count = 0
for image, current_pose, desired_pose, current_gripper, desired_gripper in timestep_dataset:
count += 1
self.assertEqual(image.shape, (100, 100, 3), "Image should have shape (100, 100, 3)")
self.assertEqual(current_pose.shape, (4, 4), "Current pose should have shape (4, 4)")
self.assertEqual(desired_pose.shape, (4, 4), "Desired pose should have shape (4, 4)")
self.assertGreater(count, 0)
def check_lookahead_timestep_shape(self, lookahead_count, timestep_dataset):
count = 0
for image, current_pose, desired_pose, current_gripper, desired_gripper in timestep_dataset:
count += 1
self.assertEqual(image.shape, (100, 100, 3), "Image should have shape (100, 100, 3)")
self.assertEqual(current_pose.shape, (4, 4), "Current pose should have shape (4, 4)")
self.assertEqual(desired_pose.shape, (lookahead_count, 4, 4), "Desired pose should have shape (4, 4)")
self.assertGreater(count, 0)
# def test_greyscale_image_generation(self):
# json_files = self.get_test_trajectory_files()
# self.assertTrue(len(json_files) > 0, "There should be at least one trajectory file")
# trajectory_dataset = flatten_nested_dataset(
# make_trajectory_dataset([
# make_timestep_dataset(make_json_parser(json_file, self.get_test_image_dir(), (100, 100), greyscale_images=True))
# for json_file in json_files
# ])
# )
# for image, _, _, _, _ in trajectory_dataset:
# self.assertEqual(image.shape[-1], 1, f"Image should be greyscale. found shape {image.shape}")
# def test_json_dataset(self):
# json_files = self.get_test_trajectory_files()
# self.assertTrue(len(json_files) > 0, "There should be at least one trajectory file")
# trajectory_dataset = flatten_nested_dataset(make_trajectory_dataset([make_timestep_dataset(make_json_parser(json_file, self.get_test_image_dir(), (100, 100))) for json_file in json_files]))
# self.check_timesteps_shape(trajectory_dataset)
# def test_synthetic_timestep_dataset(self):
# synthetic_dataset = self.create_synthetic_timestep_dataset(5, (100, 100))
# self.check_timesteps_shape(synthetic_dataset)
# def test_make_lookahead_dataset(self):
# dataset_length = 5
# lookahead_steps = 2
# single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100))
# single_timestep_data = []
# for data in single_timestep_dataset:
# single_timestep_data.append(data)
# future_dataset = make_lookahead_dataset(single_timestep_dataset, lookahead_steps=lookahead_steps, lookahead_fields=[2])
# loop_count = 0
# self.check_lookahead_timestep_shape(lookahead_steps, future_dataset)
# for i, (future_image, future_current_pose, future_desired_poses, future_current_gripper, future_desired_gripper) in enumerate(future_dataset):
# image = single_timestep_data[i][0]
# current_pose = single_timestep_data[i][1]
# desired_poses = np.array([single_timestep_data[i + j][2] for j in range(lookahead_steps)])
# self.assertTrue(np.array_equal(future_image, image))
# self.assertTrue(np.array_equal(future_current_pose, current_pose))
# self.assertTrue(np.array_equal(future_desired_poses, desired_poses))
# loop_count += 1
# self.assertEqual(loop_count, dataset_length - lookahead_steps + 1)
# def test_make_diffusion_dataset(self):
# dataset_length = 5
# diffusion_steps = 10
# single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100))
# diffusion_dataset = make_diffusion_dataset(single_timestep_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2])
# loop_count = 0
# for _, _, diffusion_desired_poses, _, _ in diffusion_dataset:
# self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, 4, 4))
# loop_count += 1
# self.assertEqual(loop_count, dataset_length)
def test_make_nested_diffusion_dataset(self):
dataset_length = 5
diffusion_steps = 10
single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100))
diffusion_dataset = make_diffusion_nested_dataset(single_timestep_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2])
# loop_count = 0
# for i in diffusion_dataset:
# print(i)
# self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, 4, 4))
# loop_count += 1
# self.assertEqual(loop_count, dataset_length)
# def test_make_lookahead_diffusion_dataset(self):
# dataset_length = 5
# lookahead_steps = 2
# diffusion_steps = 10
# single_timestep_dataset = self.create_synthetic_timestep_dataset(dataset_length, (100, 100))
# lookahead_dataset = make_lookahead_dataset(single_timestep_dataset, lookahead_steps=lookahead_steps, lookahead_fields=[2])
# diffusion_dataset = make_diffusion_dataset(lookahead_dataset, diffusion_steps=diffusion_steps, diffusion_fields=[2])
# loop_count = 0
# for _, _, diffusion_desired_poses, _, _ in diffusion_dataset:
# self.assertEqual(diffusion_desired_poses.shape, (diffusion_steps, lookahead_steps, 4, 4))
# loop_count += 1
# self.assertEqual(loop_count, dataset_length - lookahead_steps + 1)
# def test_generate_trajectory_unflattened_dataset(self):
# single_timestep_datasets = [self.create_synthetic_timestep_dataset(5, (100, 100)) for _ in range(5)]
# future_steps = 2
# future_datasets = [make_lookahead_dataset(single_timestep_dataset, lookahead_steps=future_steps, lookahead_fields=[2]) for single_timestep_dataset in single_timestep_datasets]
# trajectory_dataset = make_trajectory_dataset(future_datasets)
# for timesteps in trajectory_dataset:
# self.check_lookahead_timestep_shape(future_steps, timesteps)
# def test_generate_trajectory_flattened_dataset(self):
# single_timestep_datasets = [self.create_synthetic_timestep_dataset(5, (100, 100)) for _ in range(5)]
# future_steps = 2
# future_datasets = [make_lookahead_dataset(single_timestep_dataset, lookahead_steps=future_steps, lookahead_fields=[2]) for single_timestep_dataset in single_timestep_datasets]
# trajectory_dataset = make_trajectory_dataset(future_datasets)
# flattened_dataset = flatten_nested_dataset(trajectory_dataset)
# self.check_lookahead_timestep_shape(future_steps, flattened_dataset)
if __name__ == '__main__':
unittest.main()
@vyeevani
Copy link
Author

this stuff really needs to get cleaned up but most of it has been tested. I've been extensively using the *_3 stuff for my current trajectory modeling stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment