from pathlib import Path
from time import sleep, time
from typing import Tuple, Dict, Any, Optional
import logging
import cv2
import gymnasium as gym
import numpy as np
from pybullet import ER_TINY_RENDERER
import trifinger_simulation
import trifinger_simulation.visual_objects
from trifinger_simulation import trifingerpro_limits
import trifinger_simulation.tasks.move_cube as task
from .sampling_utils import sample_initial_cube_pose
from .utils import to_quat, get_keypoints_from_pose
class CameraWrapper:
"""Simple wrapper around camera array to change default renderer."""
def __init__(self, camera):
self.camera = camera
def get_images(self, renderer=ER_TINY_RENDERER):
return self.camera.get_images(renderer)
[docs]class SimTriFingerCubeEnv(gym.Env):
"""
Gym environment for simulated manipulation of a cube with a TriFingerPro platform.
"""
_initial_finger_position = [0.0, 0.9, -2.0] * 3
_max_fingertip_vel = 5.0
# parameters of reward function
_kernel_reward_weight = 4.0
_logkern_scale = 30
_logkern_offset = 2
# for how long to play the resetting trajectory
_reset_trajectory_length = 18700
# how many robot steps per environment step
_step_size = 20
def __init__(
self,
episode_length: int = 15,
difficulty: int = 4,
keypoint_obs: bool = True,
obs_action_delay: int = 0,
reward_type: str = "dense",
visualization: bool = False,
real_time: bool = True,
image_obs: bool = False,
camera_config_robot: int = 1,
):
"""
Args:
episode_length (int): How often step will run before done is True.
keypoint_obs (bool): Whether to give keypoint observations for
pose in addition to position and quaternion.
obs_action_delay (int): Delay between arrival of an observation
and application of the action computed from this
observation in milliseconds.
reward_type (str): Which reward to use. Can be 'dense' or 'sparse'.
visualization (bool): If true, the PyBullet GUI is run for visualization.
real_time (bool): If true, the environment is stepped in real
time instead of as fast as possible (ignored if visualization is
disabled).
image_obs (bool): If true, the camera images are returned as part
of the observation.
camera_config_robot (int): ID of the robot to retrieve camera
configs from. Only used if image_obs is True.
"""
# Basic initialization
# ====================
self.logger = logging.getLogger("trifinger_rl_datasets.SimTriFingerCubeEnv")
assert (
obs_action_delay < self._step_size
), "Delay between retrieval of observation and sending of next \
action has to be smaller than step size (20 ms)."
# will be initialized in reset()
self.platform: Optional[trifinger_simulation.TriFingerPlatform] = None
self.episode_length = episode_length
self.difficulty = difficulty
self.keypoint_obs = keypoint_obs
self.n_keypoints = 8
self.obs_action_delay = obs_action_delay
self.reward_type = reward_type
self.visualization = visualization
self.real_time = real_time
self.image_obs = image_obs
self.camera_config_robot = camera_config_robot
# load trajectory that is played back for resetting the cube
trajectory_file_path = (
Path(__file__).resolve().parent
/ "data"
/ "trifingerpro_shuffle_cube_trajectory_fast.npy"
)
with open(trajectory_file_path, "rb") as f:
self._cube_reset_traj = np.load(f)
# simulated robot has robot ID 0
self.robot_id = 0
if image_obs:
# create camera object
camera_config_dir = Path(__file__).resolve().parent / "data"
calib_filename_pattern = f"r{self.camera_config_robot}_" + "camera{id}.yml"
self.camera = trifinger_simulation.camera.create_trifinger_camera_array_from_config(
camera_config_dir, calib_filename_pattern=calib_filename_pattern
)
else:
self.camera = None
# Create the action and observation spaces
# ========================================
robot_torque_space = gym.spaces.Box(
low=trifingerpro_limits.robot_torque.low,
high=trifingerpro_limits.robot_torque.high,
)
robot_position_space = gym.spaces.Box(
low=trifingerpro_limits.robot_position.low,
high=trifingerpro_limits.robot_position.high,
dtype=np.float32,
)
robot_velocity_space = gym.spaces.Box(
low=trifingerpro_limits.robot_velocity.low,
high=trifingerpro_limits.robot_velocity.high,
dtype=np.float32,
)
robot_fingertip_force_space = gym.spaces.Box(
low=np.zeros(trifingerpro_limits.n_fingers),
high=np.ones(trifingerpro_limits.n_fingers),
dtype=np.float32,
)
robot_fingertip_pos_space = gym.spaces.Box(
low=np.array([[-0.6, -0.6, 0.0]] * trifingerpro_limits.n_fingers),
high=np.array([[0.6, 0.6, 0.6]] * trifingerpro_limits.n_fingers),
dtype=np.float32,
)
robot_fingertip_vel_space = gym.spaces.Box(
low=np.array(
[[-self._max_fingertip_vel] * 3] * trifingerpro_limits.n_fingers
),
high=np.array(
[[self._max_fingertip_vel] * 3] * trifingerpro_limits.n_fingers
),
dtype=np.float32,
)
robot_id_space = gym.spaces.Box(low=0, high=20, shape=(1,), dtype=np.int_)
# camera observation space
camera_obs_space_dict: Dict[str, gym.Space] = {
"object_position": gym.spaces.Box(
low=trifingerpro_limits.object_position.low,
high=trifingerpro_limits.object_position.high,
dtype=np.float32,
),
"object_orientation": gym.spaces.Box(
low=trifingerpro_limits.object_orientation.low,
high=trifingerpro_limits.object_orientation.high,
dtype=np.float32,
),
"delay": gym.spaces.Box(low=0.0, high=0.30, shape=(1,), dtype=np.float32),
"confidence": gym.spaces.Box(
low=0.0, high=1.0, shape=(1,), dtype=np.float32
),
}
if self.keypoint_obs:
camera_obs_space_dict["object_keypoints"] = gym.spaces.Box(
low=np.array([[-0.6, -0.6, 0.0]] * self.n_keypoints),
high=np.array([[0.6, 0.6, 0.3]] * self.n_keypoints),
dtype=np.float32,
)
if self.image_obs:
n_cameras = len(self.camera.cameras)
images_space = gym.spaces.Box(
low=0,
high=255,
shape=(
n_cameras,
3,
self.camera.cameras[0]._output_height,
self.camera.cameras[0]._output_width,
),
dtype=np.uint8,
)
camera_obs_space_dict["images"] = images_space
camera_obs_space = gym.spaces.Dict(camera_obs_space_dict)
# goal space
if self.difficulty == 4:
if self.keypoint_obs:
goal_space = gym.spaces.Dict(
{"object_keypoints": camera_obs_space["object_keypoints"]}
)
else:
goal_space = gym.spaces.Dict(
{
k: camera_obs_space[k]
for k in ["object_position", "object_orientation"]
}
)
else:
goal_space = gym.spaces.Dict(
{"object_position": camera_obs_space["object_position"]}
)
# action space
self.action_space = robot_torque_space
self._initial_action = trifingerpro_limits.robot_torque.default
# NOTE: The order of dictionary items matters as it determines how
# the observations are flattened/unflattened. The observation space
# is therefore sorted by key.
def sort_by_key(d):
return {
k: (
gym.spaces.Dict(sort_by_key(v.spaces))
if isinstance(v, gym.spaces.Dict)
else v
)
for k, v in sorted(d.items(), key=lambda item: item[0])
}
# complete observation space
self.observation_space = gym.spaces.Dict(
sort_by_key(
{
"robot_observation": gym.spaces.Dict(
{
"position": robot_position_space,
"velocity": robot_velocity_space,
"torque": robot_torque_space,
"fingertip_force": robot_fingertip_force_space,
"fingertip_position": robot_fingertip_pos_space,
"fingertip_velocity": robot_fingertip_vel_space,
"robot_id": robot_id_space,
}
),
"camera_observation": camera_obs_space,
"action": self.action_space,
"desired_goal": goal_space,
"achieved_goal": goal_space,
}
)
)
self._old_camera_obs: Optional[Dict[str, Any]] = None
self.t_obs: int = 0
# Count consecutive steps where timing is violated (to decide when the show a
# warning)
self._timing_violation_counter = 0
def _kernel_reward(
self, achieved_goal: np.ndarray, desired_goal: np.ndarray
) -> float:
"""Compute reward by evaluating a logistic kernel on the pairwise distance of
points.
Parameters can be either a 1 dim. array of size 3 (positions) or a two dim.
array with last dim. of size 3 (keypoints)
Args:
achieved_goal: Position or keypoints of current pose of the object.
desired_goal: Position or keypoints of goal pose of the object.
"""
diff = achieved_goal - desired_goal
dist = np.linalg.norm(diff, axis=-1)
scaled = self._logkern_scale * dist
# Use logistic kernel
rew = self._kernel_reward_weight * np.mean(
1.0 / (np.exp(scaled) + self._logkern_offset + np.exp(-scaled))
)
return rew
def _append_desired_action(self, robot_action):
"""Append desired action to queue and wait if real time is enabled."""
t = self.platform.append_desired_action(robot_action)
if self.visualization and self.real_time:
sleep(max(0.001 - (time() - self.time_of_last_step), 0.0))
self.time_of_last_step = time()
return t
[docs] def compute_reward(
self, achieved_goal: dict, desired_goal: dict, info: dict
) -> float:
"""Compute the reward for the given achieved and desired goal.
Args:
achieved_goal: Current pose of the object.
desired_goal: Goal pose of the object.
info: An info dictionary containing a field "time_index" which
contains the time index of the achieved_goal.
Returns:
The reward that corresponds to the provided achieved goal w.r.t. to
the desired goal.
"""
if self.reward_type == "dense":
if self.difficulty == 4:
# Use full keypoints if available as only difficulty 4 considers
# orientation
return self._kernel_reward(
achieved_goal["object_keypoints"], desired_goal["object_keypoints"]
)
else:
# use position for all other difficulties
return self._kernel_reward(
achieved_goal["object_position"], desired_goal["object_position"]
)
elif self.reward_type == "sparse":
return self.has_achieved(achieved_goal, desired_goal)
else:
raise NotImplementedError(
f"Reward type {self.reward_type} is not supported"
)
[docs] def has_achieved(self, achieved_goal: dict, desired_goal: dict) -> bool:
"""Determine whether goal pose is achieved."""
POSITION_THRESHOLD = 0.02
ANGLE_THRESHOLD_DEG = 22.0
desired = desired_goal
achieved = achieved_goal
position_diff = np.linalg.norm(
desired["object_position"] - achieved["object_position"]
)
# cast from np.bool_ to bool to make mypy happy
position_check = bool(position_diff < POSITION_THRESHOLD)
if self.difficulty < 4:
return position_check
else:
a = to_quat(desired["object_orientation"])
b = to_quat(achieved["object_orientation"])
b_conj = b.conjugate()
quat_prod = a * b_conj
norm = np.linalg.norm([quat_prod.x, quat_prod.y, quat_prod.z])
norm = min(norm, 1.0) # type: ignore
angle = 2.0 * np.arcsin(norm)
orientation_check = angle < 2.0 * np.pi * ANGLE_THRESHOLD_DEG / 360.0
return position_check and orientation_check
def _check_action(self, action):
low_check = self.action_space.low <= action
high_check = self.action_space.high >= action
return np.all(np.logical_and(low_check, high_check))
[docs] def step(
self, action: np.ndarray, preappend_actions: bool = True
) -> Tuple[dict, float, bool, bool, dict]:
"""Run one timestep of the environment's dynamics.
When end of episode is reached, you are responsible for calling
``reset()`` to reset this environment's state.
Args:
action: An action provided by the agent
preappend_actions (bool): Whether to already append actions that
will be executed during obs-action delay to action queue.
Returns:
tuple:
- observation (dict): agent's observation of the current environment.
- reward (float): amount of reward returned after previous action.
- terminated (bool): whether the MDP has reached a terminal state. If true,
the user needs to call `reset()`.
- truncated (bool): Whether the truncation condition outside the scope
of the MDP is satisfied. For this environment this corresponds to a
timeout. If true, the user needs to call `reset()`.
- info (dict): info dictionary containing the current time index.
"""
if self.platform is None:
raise RuntimeError("Call `reset()` before starting to step.")
if not self._check_action(action):
raise ValueError("Given action is not contained in the action space.")
self.step_count += 1
# get robot action
robot_action = self._gym_action_to_robot_action(action)
# check timing and show a warning/error if delayed
# do not check in first iteration as no time index is available yet (would lead
# to dead-lock)
if self.t_obs > 0:
t_now = self.platform.get_current_timeindex()
t_expected = self.t_obs + self.obs_action_delay
if t_now > t_expected:
self._timing_violation_counter += 1
extreme = t_now > self.t_obs + self._step_size
if extreme or self._timing_violation_counter >= 3:
delay = t_now - t_expected
self.logger.warning(
f"Control loop got delayed by {delay} ms."
" The action will be applied for a shorter time to catch up."
" Please check if your policy is fast enough (max. computation"
f" time should be <{1 + self.obs_action_delay} ms)."
)
if extreme:
self.logger.error(
"ERROR: Control loop got delayed by more than a full step."
" Timing of the episode will be significantly affected!"
)
else:
self._timing_violation_counter = 0
# send new action to robot until new observation is to be provided
# Note that by initially setting t the way it is, it is ensured that the loop
# always runs at least one iteration, even if the actual time step is already
# ahead by more than one step size.
t = self.t_obs + self.obs_action_delay
while t < self.t_obs + self._step_size:
t = self._append_desired_action(robot_action)
# time of the new observation
self.t_obs = t
observation, info = self._create_observation(self.t_obs, action)
reward = self.compute_reward(
observation["achieved_goal"], observation["desired_goal"], info
)
truncated = self.step_count >= self.episode_length
if not truncated and preappend_actions:
t_now = self.platform.get_current_timeindex()
# Append action to action queue of robot for as many time
# steps as the obs_action_delay dictates. This gives the
# user time to evaluate the policy.
# Also take time into account that might have already passed
# while the observation was processed.
for _ in range(max(self.obs_action_delay - (t_now - self.t_obs), 0)):
self._append_desired_action(robot_action)
return observation, reward, False, truncated, info
[docs] def reset( # type: ignore
self, preappend_actions: bool = True
):
"""Reset the environment."""
super().reset()
# hard-reset simulation
del self.platform
# initialize simulation
initial_robot_position = trifingerpro_limits.robot_position.default
initial_object_pose = sample_initial_cube_pose()
initial_object_pose.position[2] += 0.0005 # avoid negative z of keypoint
self.platform = trifinger_simulation.TriFingerPlatform(
visualization=self.visualization,
initial_robot_position=initial_robot_position,
initial_object_pose=initial_object_pose,
enable_cameras=self.image_obs,
)
if self.image_obs:
# overwrite camera with wrapped version which uses software rendering
self.platform.tricamera = CameraWrapper(self.camera)
first_camera_obs = self.platform._get_current_camera_observation(0)
self.platform._delayed_camera_observation = first_camera_obs
self.platform._camera_observation_t = first_camera_obs
# sample goal
self.active_goal = task.sample_goal(difficulty=self.difficulty)
# visualize the goal (but not if image observations are used)
if self.visualization and not self.image_obs:
self.goal_marker = trifinger_simulation.visual_objects.CubeMarker(
width=task._CUBE_WIDTH,
position=self.active_goal.position,
orientation=self.active_goal.orientation,
pybullet_client_id=self.platform.simfinger._pybullet_client_id,
)
self.step_count = 0
self.time_of_last_step = time()
# need to already do one step to get initial observation
self.t_obs = 0
obs, _, _, _, info = self.step(
self._initial_action, preappend_actions=preappend_actions
)
info = {"time_index": -1}
return obs, info
[docs] def reset_fingers(self, reset_wait_time: int = 3000):
"""Reset fingers to initial position.
This resets neither the frontend nor the cube. This method is
supposed to be used for 'soft resets' between episodes in one
job.
"""
assert self.platform is not None, "Environment is not initialised."
action = self.platform.Action(position=self._initial_finger_position)
for _ in range(reset_wait_time):
t = self._append_desired_action(action)
self.t_obs = t
# reset step_count even though this is not a full reset
self.step_count = 0
# block until reset wait time has passed and return observation
obs, info = self._create_observation(t, self._initial_action)
return obs, info
[docs] def sample_new_goal(self, goal=None):
"""Sample a new desired goal."""
if goal is None:
self.active_goal = task.sample_goal(difficulty=self.difficulty)
else:
self.active_goal.position = np.array(goal["position"], dtype=np.float32)
self.active_goal.orientation = np.array(
goal["orientation"], dtype=np.float32
)
# update goal visualisation
if self.visualization and not self.image_obs:
self.goal_marker.set_state(
self.active_goal.position, self.active_goal.orientation
)
def _get_pose_delay(self, camera_observation, t):
"""Get delay between when the object pose was captured and now."""
return t / 1000.0 - camera_observation.cameras[0].timestamp
def _clip_observation(self, obs):
"""Clip observation."""
def clip_recursively(o, space):
for k, v in space.spaces.items():
if isinstance(v, gym.spaces.Box):
np.clip(o[k], v.low, v.high, dtype=v.dtype, out=o[k])
else:
clip_recursively(o[k], v)
clip_recursively(obs, self.observation_space)
def _create_observation(self, t: int, action: np.ndarray) -> Tuple[dict, dict]:
assert self.platform is not None, "Environment is not initialised."
robot_observation = self.platform.get_robot_observation(t)
camera_observation = self.platform.get_camera_observation(t)
object_observation = camera_observation.object_pose
info: Dict[str, Any] = {"time_index": t}
# camera observation
camera_obs_processed = {
"object_position": object_observation.position.astype(np.float32),
"object_orientation": object_observation.orientation.astype(np.float32),
# time elapsed since capturing of pose in seconds
"delay": np.array(
[self._get_pose_delay(camera_observation, t)], dtype=np.float32
),
"confidence": np.array([object_observation.confidence], dtype=np.float32),
}
if self.image_obs:
if len(camera_observation.cameras[0].image.shape) == 2:
# images from real platform have to be debayered
images = np.array([cv2.cvtColor(cam.image, cv2.COLOR_BAYER_BG2RGB) for cam in camera_observation.cameras])
else:
# RGB camera images created with software renderer
# (using openGL requires GUI to run)
images = np.array([cam.image for cam in camera_observation.cameras])
# convert to channel first
images = np.transpose(images, (0, 3, 1, 2))
camera_obs_processed["images"] = images
if self.keypoint_obs:
camera_obs_processed["object_keypoints"] = get_keypoints_from_pose(
object_observation
)
if self._old_camera_obs is not None:
# handle quaternion flipping
q_sum = (
self._old_camera_obs["object_orientation"]
+ camera_obs_processed["object_orientation"]
)
if np.linalg.norm(q_sum) < 0.2:
camera_obs_processed["object_orientation"] = -camera_obs_processed[
"object_orientation"
]
self._old_camera_obs = camera_obs_processed
# goal represented as position and orientation
desired_goal_pos_ori = {
"object_position": self.active_goal.position.astype(np.float32),
"object_orientation": self.active_goal.orientation.astype(np.float32),
}
achieved_goal_pos_ori = {
"object_position": camera_obs_processed["object_position"],
"object_orientation": camera_obs_processed["object_orientation"],
}
# goal as shown to agent
if self.difficulty == 4:
if self.keypoint_obs:
desired_goal = {
"object_keypoints": get_keypoints_from_pose(self.active_goal)
}
achieved_goal = {
"object_keypoints": camera_obs_processed["object_keypoints"]
}
else:
desired_goal = desired_goal_pos_ori
achieved_goal = achieved_goal_pos_ori
else:
desired_goal = {
"object_position": self.active_goal.position.astype(np.float32)
}
achieved_goal = {"object_position": camera_obs_processed["object_position"]}
# fingertip positions and velocities
fingertip_position, fingertip_velocity = self.platform.forward_kinematics(
robot_observation.position, robot_observation.velocity
)
fingertip_position = np.array(fingertip_position, dtype=np.float32)
fingertip_velocity = np.array(fingertip_velocity, dtype=np.float32)
observation = {
"robot_observation": {
"position": robot_observation.position.astype(np.float32),
"velocity": robot_observation.velocity.astype(np.float32),
"torque": robot_observation.torque.astype(np.float32),
"fingertip_force": robot_observation.tip_force.astype(np.float32),
"fingertip_position": fingertip_position,
"fingertip_velocity": fingertip_velocity,
"robot_id": np.array([self.robot_id], dtype=np.int_),
},
"camera_observation": camera_obs_processed,
"action": action.astype(np.float32),
"desired_goal": desired_goal,
"achieved_goal": achieved_goal,
}
# clip observation
self._clip_observation(observation)
has_achieved = self.has_achieved(achieved_goal_pos_ori, desired_goal_pos_ori)
info["has_achieved"] = has_achieved
info["desired_goal"] = desired_goal_pos_ori
return observation, info
def _gym_action_to_robot_action(self, gym_action: np.ndarray):
assert self.platform is not None, "Environment is not initialised."
# robot action is torque
robot_action = self.platform.Action(torque=gym_action)
return robot_action
[docs] def render(self, mode: str = "human"):
"""Does nothing. See :class:`SimTriFingerCubeEnv` for how to enable
visualization."""
pass
[docs] def reset_cube(self):
"""Replay a recorded trajectory to move cube to center of arena."""
for position in self._cube_reset_traj[: self._reset_trajectory_length : 2]:
robot_action = self.platform.Action(position=position)
self._append_desired_action(robot_action)