Source code for rrc_example_package.cube_trajectory_env

"""Example Gym environment for the RRC 2021 Phase 2."""
import enum
import typing

import gym
import numpy as np

import robot_fingers
import trifinger_simulation
import trifinger_simulation.visual_objects
from trifinger_simulation import trifingerpro_limits

import trifinger_simulation.tasks.move_cube_on_trajectory as task


[docs]class ActionType(enum.Enum): """Different action types that can be used to control the robot.""" #: Use pure torque commands. The action is a list of torques (one per #: joint) in this case. TORQUE = enum.auto() #: Use joint position commands. The action is a list of angular joint #: positions (one per joint) in this case. Internally a PD controller is #: executed for each action to determine the torques that are applied to #: the robot. POSITION = enum.auto() #: Use both torque and position commands. In this case the action is a #: dictionary with keys "torque" and "position" which contain the #: corresponding lists of values (see above). The torques resulting from #: the position controller are added to the torques in the action before #: applying them to the robot. TORQUE_AND_POSITION = enum.auto()
class BaseCubeTrajectoryEnv(gym.GoalEnv): """Gym environment for moving cubes with TriFingerPro.""" def __init__( self, goal_trajectory: typing.Optional[task.Trajectory] = None, action_type: ActionType = ActionType.POSITION, step_size: int = 1, ): """Initialize. Args: goal_trajectory: Goal trajectory for the cube. If ``None`` a new random trajectory is sampled upon reset. action_type: Specify which type of actions to use. See :class:`ActionType` for details. step_size: Number of actual control steps to be performed in one call of step(). """ # Basic initialization # ==================== if goal_trajectory is not None: task.validate_goal(goal_trajectory) self.goal = goal_trajectory self.action_type = action_type if step_size < 1: raise ValueError("step_size cannot be less than 1.") self.step_size = step_size # will be initialized in reset() self.platform = None # Create the action and observation spaces # ======================================== robot_torque_space = gym.spaces.Box( low=trifingerpro_limits.robot_torque.low, high=trifingerpro_limits.robot_torque.high, ) robot_position_space = gym.spaces.Box( low=trifingerpro_limits.robot_position.low, high=trifingerpro_limits.robot_position.high, ) robot_velocity_space = gym.spaces.Box( low=trifingerpro_limits.robot_velocity.low, high=trifingerpro_limits.robot_velocity.high, ) object_state_space = gym.spaces.Dict( { "position": gym.spaces.Box( low=trifingerpro_limits.object_position.low, high=trifingerpro_limits.object_position.high, ), "orientation": gym.spaces.Box( low=trifingerpro_limits.object_orientation.low, high=trifingerpro_limits.object_orientation.high, ), } ) if self.action_type == ActionType.TORQUE: self.action_space = robot_torque_space self._initial_action = trifingerpro_limits.robot_torque.default elif self.action_type == ActionType.POSITION: self.action_space = robot_position_space self._initial_action = trifingerpro_limits.robot_position.default elif self.action_type == ActionType.TORQUE_AND_POSITION: self.action_space = gym.spaces.Dict( { "torque": robot_torque_space, "position": robot_position_space, } ) self._initial_action = { "torque": trifingerpro_limits.robot_torque.default, "position": trifingerpro_limits.robot_position.default, } else: raise ValueError("Invalid action_type") self.observation_space = gym.spaces.Dict( { "robot_observation": gym.spaces.Dict( { "position": robot_position_space, "velocity": robot_velocity_space, "torque": robot_torque_space, } ), "object_observation": gym.spaces.Dict( { "position": object_state_space["position"], "orientation": object_state_space["orientation"], } ), "action": self.action_space, "desired_goal": object_state_space["position"], "achieved_goal": object_state_space["position"], } ) def compute_reward( self, achieved_goal: task.Position, desired_goal: task.Position, info: dict, ) -> float: """Compute the reward for the given achieved and desired goal. Args: achieved_goal: Current position of the object. desired_goal: Goal position of the current trajectory step. info: An info dictionary containing a field "time_index" which contains the time index of the achieved_goal. Returns: The reward that corresponds to the provided achieved goal w.r.t. to the desired goal. Note that the following should always hold true:: ob, reward, done, info = env.step() assert reward == env.compute_reward( ob['achieved_goal'], ob['desired_goal'], info, ) """ # This is just some sanity check to verify that the given desired_goal # actually matches with the active goal in the trajectory. active_goal = np.asarray( task.get_active_goal( self.info["trajectory"], self.info["time_index"] ) ) assert np.all(active_goal == desired_goal), "{}: {} != {}".format( info["time_index"], active_goal, desired_goal ) return -task.evaluate_state( info["trajectory"], info["time_index"], achieved_goal ) def step(self, action): """Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling ``reset()`` to reset this environment's state. Args: action: An action provided by the agent (depends on the selected :class:`ActionType`). Returns: tuple: - observation (dict): agent's observation of the current environment. - reward (float): amount of reward returned after previous action. - done (bool): whether the episode has ended, in which case further step() calls will return undefined results. - info (dict): info dictionary containing the current time index. """ raise NotImplementedError() def reset(self): raise NotImplementedError() def seed(self, seed=None): """Sets the seed for this env’s random number generator. .. note:: Spaces need to be seeded separately. E.g. if you want to sample actions directly from the action space using ``env.action_space.sample()`` you can set a seed there using ``env.action_space.seed()``. Returns: List of seeds used by this environment. This environment only uses a single seed, so the list contains only one element. """ self.np_random, seed = gym.utils.seeding.np_random(seed) task.seed(seed) return [seed] def _create_observation(self, t, action): robot_observation = self.platform.get_robot_observation(t) camera_observation = self.platform.get_camera_observation(t) object_observation = camera_observation.filtered_object_pose active_goal = np.asarray( task.get_active_goal(self.info["trajectory"], t) ) observation = { "robot_observation": { "position": robot_observation.position, "velocity": robot_observation.velocity, "torque": robot_observation.torque, }, "object_observation": { "position": object_observation.position, "orientation": object_observation.orientation, }, "action": action, "desired_goal": active_goal, "achieved_goal": object_observation.position, } return observation def _gym_action_to_robot_action(self, gym_action): # construct robot action depending on action type if self.action_type == ActionType.TORQUE: robot_action = self.platform.Action(torque=gym_action) elif self.action_type == ActionType.POSITION: robot_action = self.platform.Action(position=gym_action) elif self.action_type == ActionType.TORQUE_AND_POSITION: robot_action = self.platform.Action( torque=gym_action["torque"], position=gym_action["position"] ) else: raise ValueError("Invalid action_type") return robot_action
[docs]class SimCubeTrajectoryEnv(BaseCubeTrajectoryEnv): """Gym environment for moving cubes with simulated TriFingerPro."""
[docs] def __init__( self, goal_trajectory: typing.Optional[task.Trajectory] = None, action_type: ActionType = ActionType.POSITION, step_size: int = 1, visualization: bool = False, ): """Initialize. Args: goal_trajectory: Goal trajectory for the cube. If ``None`` a new random trajectory is sampled upon reset. action_type (ActionType): Specify which type of actions to use. See :class:`ActionType` for details. step_size (int): Number of actual control steps to be performed in one call of step(). visualization (bool): If true, the pyBullet GUI is run for visualization. """ super().__init__( goal_trajectory=goal_trajectory, action_type=action_type, step_size=step_size, ) self.visualization = visualization
[docs] def step(self, action): """Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling ``reset()`` to reset this environment's state. Args: action: An action provided by the agent (depends on the selected :class:`ActionType`). Returns: tuple: - observation (dict): agent's observation of the current environment. - reward (float): amount of reward returned after previous action. - done (bool): whether the episode has ended, in which case further step() calls will return undefined results. - info (dict): info dictionary containing the current time index. """ if self.platform is None: raise RuntimeError("Call `reset()` before starting to step.") if not self.action_space.contains(action): raise ValueError( "Given action is not contained in the action space." ) num_steps = self.step_size # ensure episode length is not exceeded due to step_size step_count_after = self.step_count + num_steps if step_count_after > task.EPISODE_LENGTH: excess = step_count_after - task.EPISODE_LENGTH num_steps = max(1, num_steps - excess) reward = 0.0 for _ in range(num_steps): self.step_count += 1 if self.step_count > task.EPISODE_LENGTH: raise RuntimeError("Exceeded number of steps for one episode.") # send action to robot robot_action = self._gym_action_to_robot_action(action) t = self.platform.append_desired_action(robot_action) # update goal visualization if self.visualization: goal_position = task.get_active_goal( self.info["trajectory"], t ) self.goal_marker.set_state(goal_position, (0, 0, 0, 1)) # Use observations of step t + 1 to follow what would be expected # in a typical gym environment. Note that on the real robot, this # will not be possible self.info["time_index"] = t + 1 # Alternatively use the observation of step t. This is the # observation from the moment before action_t is applied, i.e. the # result of that action is not yet visible in this observation. # # When using this observation, the resulting cumulative reward # should match exactly the one computed during replay (with the # above it will differ slightly). # # self.info["time_index"] = t observation = self._create_observation( self.info["time_index"], action ) reward += self.compute_reward( observation["achieved_goal"], observation["desired_goal"], self.info, ) is_done = self.step_count >= task.EPISODE_LENGTH return observation, reward, is_done, self.info
[docs] def reset(self): """Reset the environment.""" # hard-reset simulation del self.platform # initialize simulation initial_robot_position = trifingerpro_limits.robot_position.default # initialize cube at the centre initial_object_pose = task.move_cube.Pose( position=task.INITIAL_CUBE_POSITION ) self.platform = trifinger_simulation.TriFingerPlatform( visualization=self.visualization, initial_robot_position=initial_robot_position, initial_object_pose=initial_object_pose, ) # if no goal is given, sample one randomly if self.goal is None: trajectory = task.sample_goal() else: trajectory = self.goal # visualize the goal if self.visualization: self.goal_marker = trifinger_simulation.visual_objects.CubeMarker( width=task.move_cube._CUBE_WIDTH, position=trajectory[0][1], orientation=(0, 0, 0, 1), pybullet_client_id=self.platform.simfinger._pybullet_client_id, ) self.info = {"time_index": -1, "trajectory": trajectory} self.step_count = 0 return self._create_observation(0, self._initial_action)
class RealRobotCubeTrajectoryEnv(BaseCubeTrajectoryEnv): """Gym environment for moving cubes with real TriFingerPro.""" def step(self, action): """Run one timestep of the environment's dynamics. Important: ``reset()`` needs to be called before doing the first step. Args: action: An action provided by the agent (depends on the selected :class:`ActionType`). Returns: tuple: - observation (dict): agent's observation of the current environment. - reward (float): amount of reward returned after previous action. - done (bool): whether the episode has ended, in which case further step() calls will return undefined results. - info (dict): info dictionary containing the current time index. """ if self.platform is None: raise RuntimeError("Call `reset()` before starting to step.") if not self.action_space.contains(action): raise ValueError( "Given action is not contained in the action space." ) num_steps = self.step_size # ensure episode length is not exceeded due to step_size step_count_after = self.info["time_index"] + num_steps if step_count_after > task.EPISODE_LENGTH: excess = step_count_after - task.EPISODE_LENGTH num_steps = max(1, num_steps - excess) reward = 0.0 for _ in range(num_steps): # send action to robot robot_action = self._gym_action_to_robot_action(action) t = self.platform.append_desired_action(robot_action) self.info["time_index"] = t observation = self._create_observation(t, action) reward += self.compute_reward( observation["achieved_goal"], observation["desired_goal"], self.info, ) # make sure to not exceed the episode length if t >= task.EPISODE_LENGTH - 1: break is_done = t >= task.EPISODE_LENGTH return observation, reward, is_done, self.info def reset(self): # cannot reset multiple times if self.platform is not None: raise RuntimeError( "Once started, this environment cannot be reset." ) self.platform = robot_fingers.TriFingerPlatformWithObjectFrontend() # if no goal is given, sample one randomly if self.goal is None: trajectory = task.sample_goal() else: trajectory = self.goal self.info = {"time_index": -1, "trajectory": trajectory} # need to already do one step to get initial observation # TODO disable frameskip here? observation, _, _, _ = self.step(self._initial_action) return observation