Source code for rrc_simulation.gym_wrapper.envs.cube_env

"""Gym environment for the Real Robot Challenge Phase 1 (Simulation)."""
import enum

import numpy as np
import gym

from rrc_simulation import TriFingerPlatform
from rrc_simulation import visual_objects
from rrc_simulation.tasks import move_cube


[docs]class RandomInitializer: """Initializer that samples random initial states and goals."""
[docs] def __init__(self, difficulty): """Initialize. Args: difficulty (int): Difficulty level for sampling goals. """ self.difficulty = difficulty
[docs] def get_initial_state(self): """Get a random initial object pose (always on the ground).""" return move_cube.sample_goal(difficulty=-1)
[docs] def get_goal(self): """Get a random goal depending on the difficulty.""" return move_cube.sample_goal(difficulty=self.difficulty)
[docs]class FixedInitializer: """Initializer that uses fixed values for initial pose and goal."""
[docs] def __init__(self, difficulty, initial_state, goal): """Initialize. Args: difficulty (int): Difficulty level of the goal. This is still needed even for a fixed goal, as it is also used for computing the reward (the cost function is different for the different levels). initial_state (move_cube.Pose): Initial pose of the object. goal (move_cube.Pose): Goal pose of the object. Raises: Exception: If initial_state or goal are not valid. See :meth:`move_cube.validate_goal` for more information. """ move_cube.validate_goal(initial_state) move_cube.validate_goal(goal) self.difficulty = difficulty self.initial_state = initial_state self.goal = goal
[docs] def get_initial_state(self): """Get the initial state that was set in the constructor.""" return self.initial_state
[docs] def get_goal(self): """Get the goal that was set in the constructor.""" return self.goal
[docs]class ActionType(enum.Enum): """Different action types that can be used to control the robot.""" #: Use pure torque commands. The action is a list of torques (one per #: joint) in this case. TORQUE = enum.auto() #: Use joint position commands. The action is a list of angular joint #: positions (one per joint) in this case. Internally a PD controller is #: executed for each action to determine the torques that are applied to #: the robot. POSITION = enum.auto() #: Use both torque and position commands. In this case the action is a #: dictionary with keys "torque" and "position" which contain the #: corresponding lists of values (see above). The torques resulting from #: the position controller are added to the torques in the action before #: applying them to the robot. TORQUE_AND_POSITION = enum.auto()
[docs]class CubeEnv(gym.GoalEnv): """Gym environment for moving cubes with simulated TriFingerPro."""
[docs] def __init__( self, initializer, action_type=ActionType.POSITION, frameskip=1, visualization=False, ): """Initialize. Args: initializer: Initializer class for providing initial cube pose and goal pose. See :class:`RandomInitializer` and :class:`FixedInitializer`. action_type (ActionType): Specify which type of actions to use. See :class:`ActionType` for details. frameskip (int): Number of actual control steps to be performed in one call of step(). visualization (bool): If true, the pyBullet GUI is run for visualization. """ # Basic initialization # ==================== self.initializer = initializer self.action_type = action_type self.visualization = visualization # TODO: The name "frameskip" makes sense for an atari environment but # not really for our scenario. The name is also misleading as # "frameskip = 1" suggests that one frame is skipped while it actually # means "do one step per step" (i.e. no skip). if frameskip < 1: raise ValueError("frameskip cannot be less than 1.") self.frameskip = frameskip # will be initialized in reset() self.platform = None # Create the action and observation spaces # ======================================== spaces = TriFingerPlatform.spaces object_state_space = gym.spaces.Dict( { "position": spaces.object_position.gym, "orientation": spaces.object_orientation.gym, } ) if self.action_type == ActionType.TORQUE: self.action_space = spaces.robot_torque.gym elif self.action_type == ActionType.POSITION: self.action_space = spaces.robot_position.gym elif self.action_type == ActionType.TORQUE_AND_POSITION: self.action_space = gym.spaces.Dict( { "torque": spaces.robot_torque.gym, "position": spaces.robot_position.gym, } ) else: raise ValueError("Invalid action_type") self.observation_space = gym.spaces.Dict( { "observation": gym.spaces.Dict( { "position": spaces.robot_position.gym, "velocity": spaces.robot_velocity.gym, "torque": spaces.robot_torque.gym, } ), "desired_goal": object_state_space, "achieved_goal": object_state_space, } )
[docs] def compute_reward(self, achieved_goal, desired_goal, info): """Compute the reward for the given achieved and desired goal. Args: achieved_goal (dict): Current pose of the object. desired_goal (dict): Goal pose of the object. info (dict): An info dictionary containing a field "difficulty" which specifies the difficulty level. Returns: float: The reward that corresponds to the provided achieved goal w.r.t. to the desired goal. Note that the following should always hold true:: ob, reward, done, info = env.step() assert reward == env.compute_reward( ob['achieved_goal'], ob['desired_goal'], info, ) """ return -move_cube.evaluate_state( move_cube.Pose.from_dict(desired_goal), move_cube.Pose.from_dict(achieved_goal), info["difficulty"], )
[docs] def step(self, action): """Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling ``reset()`` to reset this environment's state. Args: action: An action provided by the agent (depends on the selected :class:`ActionType`). Returns: tuple: - observation (dict): agent's observation of the current environment. - reward (float) : amount of reward returned after previous action. - done (bool): whether the episode has ended, in which case further step() calls will return undefined results. - info (dict): info dictionary containing the difficulty level of the goal. """ if self.platform is None: raise RuntimeError("Call `reset()` before starting to step.") if not self.action_space.contains(action): raise ValueError( "Given action is not contained in the action space." ) num_steps = self.frameskip # ensure episode length is not exceeded due to frameskip step_count_after = self.step_count + num_steps if step_count_after > move_cube.episode_length: excess = step_count_after - move_cube.episode_length num_steps = max(1, num_steps - excess) reward = 0.0 for _ in range(num_steps): self.step_count += 1 if self.step_count > move_cube.episode_length: raise RuntimeError("Exceeded number of steps for one episode.") # send action to robot robot_action = self._gym_action_to_robot_action(action) t = self.platform.append_desired_action(robot_action) # Use observations of step t + 1 to follow what would be expected # in a typical gym environment. Note that on the real robot, this # will not be possible observation = self._create_observation(t + 1) reward += self.compute_reward( observation["achieved_goal"], observation["desired_goal"], self.info, ) is_done = self.step_count == move_cube.episode_length return observation, reward, is_done, self.info
[docs] def reset(self): # reset simulation del self.platform # initialize simulation initial_robot_position = ( TriFingerPlatform.spaces.robot_position.default ) initial_object_pose = self.initializer.get_initial_state() goal_object_pose = self.initializer.get_goal() self.platform = TriFingerPlatform( visualization=self.visualization, initial_robot_position=initial_robot_position, initial_object_pose=initial_object_pose, ) self.goal = { "position": goal_object_pose.position, "orientation": goal_object_pose.orientation, } # visualize the goal if self.visualization: self.goal_marker = visual_objects.CubeMarker( width=0.065, position=goal_object_pose.position, orientation=goal_object_pose.orientation, physicsClientId=self.platform.simfinger._pybullet_client_id, ) self.info = {"difficulty": self.initializer.difficulty} self.step_count = 0 return self._create_observation(0)
[docs] def seed(self, seed=None): """Sets the seed for this env’s random number generator. .. note:: Spaces need to be seeded separately. E.g. if you want to sample actions directly from the action space using ``env.action_space.sample()`` you can set a seed there using ``env.action_space.seed()``. Returns: List of seeds used by this environment. This environment only uses a single seed, so the list contains only one element. """ self.np_random, seed = gym.utils.seeding.np_random(seed) move_cube.random = self.np_random return [seed]
def _create_observation(self, t): robot_observation = self.platform.get_robot_observation(t) object_observation = self.platform.get_object_pose(t) observation = { "observation": { "position": robot_observation.position, "velocity": robot_observation.velocity, "torque": robot_observation.torque, }, "desired_goal": self.goal, "achieved_goal": { "position": object_observation.position, "orientation": object_observation.orientation, }, } return observation def _gym_action_to_robot_action(self, gym_action): # construct robot action depending on action type if self.action_type == ActionType.TORQUE: robot_action = self.platform.Action(torque=gym_action) elif self.action_type == ActionType.POSITION: robot_action = self.platform.Action(position=gym_action) elif self.action_type == ActionType.TORQUE_AND_POSITION: robot_action = self.platform.Action( torque=gym_action["torque"], position=gym_action["position"] ) else: raise ValueError("Invalid action_type") return robot_action