TriFingerDatasetEnv

class trifinger_rl_datasets.TriFingerDatasetEnv(name, dataset_url, ref_max_score, ref_min_score, trifinger_kwargs, real_robot=False, image_obs=False, visualization=False, obs_to_keep=None, flatten_obs=True, scale_obs=False, set_terminals=False, data_dir=None, **kwargs)[source]

TriFinger environment which can load an offline RL dataset from a file.

Similar to D4RL’s OfflineEnv but with different data loading and options for customization of observation space.

Parameters:
  • name (str) – Name of the dataset.

  • dataset_url (str) – URL pointing to the dataset.

  • ref_max_score (float) – Maximum score (for score normalization)

  • ref_min_score (float) – Minimum score (for score normalization)

  • trifinger_kwargs (dict) – Keyword arguments for underlying SimTriFingerCubeEnv environment.

  • real_robot (bool) – Whether the data was collected on real robots.

  • image_obs (bool) – Whether observations contain camera images.

  • visualization (bool) – Enables rendering for simulated environment.

  • obs_to_keep (dict) – Dictionary with the same structure as the observation of SimTriFingerCubeEnv. The boolean value of each item indicates whether it should be included in the observation. If None, the SimTriFingerCubeEnv is used.

  • flatten_obs (bool) – Whether to flatten the observation. Can be combined with obs_to_keep.

  • scale_obs (bool) – Whether to scale all components of the observation to interval [-1, 1]. Only implemented for flattend observations.

  • set_terminals (bool) – Whether to set the terminals instead of the timeouts.

  • data_dir (str or Path) – Directory where the dataset is stored. If None, the default data directory (~/.trifinger_rl_datasets) is used.

compute_reward(achieved_goal, desired_goal, info)[source]

Compute the reward for the given achieved and desired goal.

Parameters:
  • achieved_goal (dict) – Current pose of the object.

  • desired_goal (dict) – Goal pose of the object.

  • info (dict) – An info dictionary containing a field “time_index” which contains the time index of the achieved_goal.

Returns:

The reward that corresponds to the provided achieved goal w.r.t. to the desired goal.

Return type:

float

convert_timestep_to_image_index(timesteps, zarr_path=None)[source]

Convert camera timesteps to image indices.

Parameters:
  • timesteps (ndarray) – Array of camera timesteps.

  • zarr_path (str | PathLike | None) –

Returns:

Array of image indices.

Return type:

ndarray

get_dataset(zarr_path=None, clip=True, rng=None, indices=None, n_threads=None)[source]

Get the dataset.

When called for the first time, the dataset is automatically downloaded and saved to ~/.trifinger_rl_datasets.

Parameters:
  • zarr_path (str | PathLike | None) – Optional path to a Zarr directory containing the dataset, which will be used instead of the default.

  • clip (bool) – If True, observations are clipped to be within the environment’s observation space.

  • rng (Tuple[int, int] | None) – Optional range to return. rng=(m,n) means that observations, actions and rewards m to n-1 are returned. If not specified, the entire dataset is returned.

  • indices (ndarray | None) – Optional array of timestep indices for which to load data. rng and indices are mutually exclusive, only one of them can be set.

  • n_threads (int | None) – Number of threads to use for processing the images. If None, the number of threads is set to the number of CPUs available to the process.

Returns:

A dictionary containing the following keys

  • observations: Either an array or a list of dictionaries containing the observations depending on whether flatten_obs is True or False.

  • actions: Array containing the actions.

  • rewards: Array containing the rewards.

  • timeouts: Array containing the timeouts (True only at the end of an episode by default. Always False if set_terminals is True).

  • terminals: Array containing the terminals (Always False by default. If set_terminals is True, only True at the last timestep of an episode).

  • images (only if present in dataset): Array of the shape (n_control_timesteps, n_cameras, n_channels, height, width) containing the image data. The cannels are ordered as RGB.

Return type:

Dict[str, Any]

get_dataset_stats(zarr_path=None)[source]

Get statistics of dataset such as number of timesteps.

Parameters:

zarr_path (str | PathLike | None) – Optional path to a Zarr directory containing the dataset, which will be used instead of the default.

Returns:

The statistics of the dataset as a dictionary with keys

  • n_timesteps: Number of timesteps in dataset. Corresponds to the number of observations, actions and rewards.

  • obs_size: Size of the observation vector.

  • action_size: Size of the action vector.

Return type:

Dict

get_image_data(rng=None, indices=None, zarr_path=None, timestep_dimension=True, n_threads=None)[source]

Get image data from dataset.

Parameters:
  • rng (Tuple[int, int] | None) – Optional range of images to return. rng=(m,n) means that the images with indices m to n-1 are returned.

  • indices (ndarray | None) – Optional array of image indices for which to load data. rng and indices are mutually exclusive, only one of them can be set.

  • zarr_path (str | PathLike | None) – Optional path to a Zarr directory containing the dataset, which will be used instead of the default.

  • timestep_dimension (bool) – Whether to include the timestep dimension in the returned array. This is useful if the given range of indices always contains n_cameras of image indices in a row which correspond to the camera images at one camera timestep. If this assumption is violated, the first dimension will not correspond to camera timesteps anymore.

  • n_threads (int | None) – Number of threads to use for processing the images. If None, the number of threads is set to the number of CPUs available to the process.

Returns:

The image data (or a part of it specified by rng or indices) as a numpy array. If timestep_dimension is True the shape will be (n_camera_timesteps, n_cameras, n_channels, height, width) else (n_images, n_channels, height, width). The channels are ordered as RGB.

Return type:

ndarray

get_image_stats(zarr_path=None)[source]

Get statistics of image data in dataset.

Parameters:

zarr_path (str | PathLike | None) – Optional path to a Zarr directory containing the dataset, which will be used instead of the default.

Returns:

The statistics of the image data as a dictionary with keys

  • n_images: Number of images in the dataset.

  • n_cameras: Number of cameras used to capture the images.

  • n_channels: Number of channels in the images.

  • image_shape: Shape of the images in the format (height, width).

  • reorder_pixels: Whether the pixels in the images have been reordered to have the pixels corresponding to one color in the Bayer pattern together in blocks (to improve image compression).

Return type:

Dict

get_obs_indices()[source]

Get index ranges that correspond to the different observation components.

Also returns a dictionary containing the shapes of these observation components.

Returns:

  • A dictionary with keys corresponding to the observation components and values being tuples of the form (start, end), where start and end are the indices at which the observation component starts and ends. The nested dictionary structure of the observation is preserved.

  • A dictionary of the same structure but with values being the shapes of the observation components.

Return type:

Tuple[Dict, Dict]

render(mode='human')[source]

Does not do anything for this environment.

Parameters:

mode (str) –

reset()[source]

Reset the environment.

Returns:

Tuple of observation and info dictionary.

Return type:

Tuple[Dict | ndarray, Dict]

reset_fingers(reset_wait_time=3000)[source]

Moves the fingers to initial position.

This resets neither the frontend nor the cube. This method is supposed to be used for ‘soft resets’ between episodes in one job.

Parameters:

reset_wait_time (int) –

seed(seed=None)[source]

Set random seed of the environment.

Parameters:

seed (int | None) –

Return type:

List[int]

step(action, **kwargs)[source]

Execute one step.

Parameters:

action (ndarray) – Array of 9 torque commands, one for each robot joint.

Returns:

A tuple with

  • observation (dict or tuple): agent’s observation of the current environment. If self.flatten_obs is False then as a dictionary. If self.flatten_obs is True then either as a 1D NumPy array (if no images are to be included) or as a tuple (if images are to be included) consisting of

    • a 1D NumPy array containing all observations except the camera images, and

    • a NumPy array of shape (n_cameras, n_channels, height, width) containing the camera images.

  • reward (float): amount of reward returned after previous action.

  • terminated (bool): whether the MDP has reached a terminal state. If true, the user needs to call reset().

  • truncated (bool): Whether the truncation condition outside the scope of the MDP is satisfied. For this environment this corresponds to a timeout. If true, the user needs to call reset().

  • info (dict): info dictionary containing the current time index.

Return type:

Tuple[Dict | ndarray, float, bool, bool, Dict]