Source code for trifinger_simulation.trifinger_platform

import copy
import enum
import pickle
import numpy as np
import gym
from types import SimpleNamespace
import typing

from .tasks import move_cube, rearrange_dice
from .sim_finger import SimFinger, int_to_rgba
from . import camera, collision_objects, trifingerpro_limits


[docs]class ObjectType(enum.Enum): """Enumeration of supported object types.""" NONE = 0 COLORED_CUBE = 1 DICE = 2
[docs]class ObjectPose: """A pure-python copy of trifinger_object_tracking::ObjectPose.""" __slots__ = ["position", "orientation", "timestamp", "confidence"] def __init__(self): #: array: Position (x, y, z) of the object. Units are meters. self.position = np.zeros(3) #: array: Orientation of the object as (x, y, z, w) quaternion. self.orientation = np.zeros(4) #: float: Timestamp when the pose was observed. self.timestamp = 0.0 #: float: Estimate of the confidence for this pose observation. self.confidence = 1.0
[docs]class CameraObservation: """Pure-python copy of trifinger_cameras.camera.CameraObservation.""" __slots__ = ["image", "timestamp"] def __init__(self): #: array: The image. self.image = None #: float: Timestamp when the image was received. self.timestamp = None
[docs]class TriCameraObservation: """Python version of trifinger_cameras::TriCameraObservation. This is a pure-python implementation of trifinger_cameras::TriCameraObservation, so we don't need to depend on trifinger_cameras here. """ __slots__ = ["cameras"] def __init__(self): #: list of :class:`CameraObservation`: List of observations of cameras #: "camera60", "camera180" and "camera300" (in this order). self.cameras = [CameraObservation() for i in range(3)]
[docs]class TriCameraObjectObservation: """Python version of trifinger_object_tracking::TriCameraObjectObservation. This is a pure-python implementation of trifinger_object_tracking::TriCameraObjectObservation, so we don't need to depend on trifinger_object_tracking here. """ __slots__ = ["cameras", "object_pose", "filtered_object_pose"] def __init__(self): #: list of :class:`CameraObservation`: List of observations of cameras #: "camera60", "camera180" and "camera300" (in this order). self.cameras = [CameraObservation() for i in range(3)] #: ObjectPose: Pose of the object in world coordinates. self.object_pose = ObjectPose() #: ObjectPose: Filtered pose of the object in world coordinates. In #: simulation, this is the same as the unfiltered object_pose. self.filtered_object_pose = ObjectPose()
[docs]class TriFingerPlatform: """ Wrapper around the simulation providing the same interface as ``robot_interfaces::TriFingerPlatformFrontend``. The following methods of the robot_interfaces counterpart are not supported: - get_robot_status() - wait_until_timeindex() """ # Create the action and observation spaces # ======================================== spaces = SimpleNamespace() spaces.robot_torque = trifingerpro_limits.robot_torque spaces.robot_position = trifingerpro_limits.robot_position spaces.robot_velocity = trifingerpro_limits.robot_velocity spaces.object_position = trifingerpro_limits.object_position spaces.object_orientation = trifingerpro_limits.object_orientation # for convenience, we also create the respective gym spaces spaces.robot_torque.gym = gym.spaces.Box( low=spaces.robot_torque.low, high=spaces.robot_torque.high ) spaces.robot_position.gym = gym.spaces.Box( low=spaces.robot_position.low, high=spaces.robot_position.high ) spaces.robot_velocity.gym = gym.spaces.Box( low=spaces.robot_velocity.low, high=spaces.robot_velocity.high ) spaces.object_position.gym = gym.spaces.Box( low=spaces.object_position.low, high=spaces.object_position.high ) spaces.object_orientation.gym = gym.spaces.Box( low=spaces.object_orientation.low, high=spaces.object_orientation.high )
[docs] def __init__( self, visualization: bool = False, initial_robot_position: typing.Sequence[float] = None, initial_object_pose=None, enable_cameras: bool = False, time_step_s: float = 0.001, object_type: ObjectType = ObjectType.COLORED_CUBE, ): """Initialize. Args: visualization: Set to true to run visualization. initial_robot_position: Initial robot joint angles initial_object_pose: Only if ``object_type == COLORED_CUBE``: Initial pose for the manipulation object. Can be any object with attributes ``position`` (x, y, z) and ``orientation`` (x, y, z, w). If not set, a default pose is used. enable_cameras: Set to true to enable rendered images in the camera observations. If false camera observations are still available but the images will not be initialized. By default this is disabled as rendering of images takes a lot of computational power. Therefore the cameras should only be enabled if the images are actually used. time_step_s: Simulation time step duration in seconds. object_type: Which type of object to load. This also influences some other aspects: When using the cube, the camera observation will contain an attribute ``object_pose``. """ #: Camera rate in frames per second. Observations of camera and #: object pose will only be updated with this rate. self.camera_rate_fps = 10 #: Set to true to render camera observations self.enable_cameras = enable_cameras #: Simulation time step self._time_step = time_step_s # first camera update in the first step self._next_camera_update_step = 0 # Initialize robot, object and cameras # ==================================== self.simfinger = SimFinger( finger_type="trifingerpro", time_step=self._time_step, enable_visualization=visualization, ) if initial_robot_position is None: initial_robot_position = self.spaces.robot_position.default self.simfinger.reset_finger_positions_and_velocities( initial_robot_position ) if initial_object_pose is None: initial_object_pose = move_cube.Pose( position=self.spaces.object_position.default, orientation=self.spaces.object_orientation.default, ) self._has_object_tracking = False self.object_type = object_type if object_type == ObjectType.COLORED_CUBE: self.cube = collision_objects.ColoredCubeV2( position=initial_object_pose.position, orientation=initial_object_pose.orientation, pybullet_client_id=self.simfinger._pybullet_client_id, ) self._has_object_tracking = True elif object_type == ObjectType.DICE: die_mass = 0.012 # use a random goal for initial positions initial_positions = rearrange_dice.sample_goal() self.dice = [ collision_objects.Cube( position=pos, half_width=rearrange_dice.DIE_WIDTH / 2, mass=die_mass, color_rgba=int_to_rgba(0x0A7DCF), ) for pos in initial_positions ] self.tricamera = camera.TriFingerCameras( pybullet_client_id=self.simfinger._pybullet_client_id ) # Forward some methods for convenience # ==================================== # forward "RobotFrontend" methods directly to simfinger self.Action = self.simfinger.Action self.get_desired_action = self.simfinger.get_desired_action self.get_applied_action = self.simfinger.get_applied_action self.get_timestamp_ms = self.simfinger.get_timestamp_ms self.get_current_timeindex = self.simfinger.get_current_timeindex self.get_robot_observation = self.simfinger.get_observation # forward kinematics directly to simfinger self.forward_kinematics = self.simfinger.kinematics.forward_kinematics # Initialize log # ============== self._action_log = { "initial_robot_position": copy.copy(initial_robot_position), "initial_object_pose": copy.copy(initial_object_pose), "actions": [], } # get initial camera observation self._camera_observation_t = self._get_current_camera_observation(0)
def get_time_step(self): """Get simulation time step in seconds.""" return self._time_step def _compute_camera_update_step_interval(self): return (1.0 / self.camera_rate_fps) / self._time_step
[docs] def append_desired_action(self, action): """ Call :meth:`pybullet.SimFinger.append_desired_action` and add the action to the action log. Arguments/return value are the same as for :meth:`pybullet.SimFinger.append_desired_action`. """ # update camera and object observations only with the rate of the # cameras next_t = self.simfinger._t + 1 has_camera_update = next_t >= self._next_camera_update_step if has_camera_update: self._next_camera_update_step += ( self._compute_camera_update_step_interval() ) self._camera_observation_t = self._get_current_camera_observation() t = self.simfinger.append_desired_action(action) # The correct timestamp can only be acquired now that t is given. # Update it accordingly in the object and camera observations if has_camera_update: camera_timestamp_s = self.get_timestamp_ms(t) / 1000 for i in range(len(self._camera_observation_t.cameras)): self._camera_observation_t.cameras[ i ].timestamp = camera_timestamp_s if self._has_object_tracking: self._camera_observation_t.object_pose.timestamp = ( camera_timestamp_s ) self._camera_observation_t.filtered_object_pose.timestamp = ( camera_timestamp_s ) # write the desired action to the log camera_obs = self.get_camera_observation(t) robot_obs = self.get_robot_observation(t) log_entry = { "t": t, "action": action, "robot_observation": robot_obs, } if self._has_object_tracking: log_entry["object_pose"] = camera_obs.object_pose # make a deep copy of log_entry to ensure all reference ties are cut self._action_log["actions"].append(copy.deepcopy(log_entry)) return t
def _get_current_object_pose(self): assert self._has_object_tracking cube_state = self.cube.get_state() pose = ObjectPose() pose.position = np.asarray(cube_state[0]) pose.orientation = np.asarray(cube_state[1]) pose.confidence = 1.0 return pose def _get_current_camera_observation(self, t=None): if self.enable_cameras: images = self.tricamera.get_images() else: images = [None] * 3 if self._has_object_tracking: observation = TriCameraObjectObservation() else: observation = TriCameraObservation() # NOTE: The timestamp can only be set correctly after time step t # is actually reached. Therefore, this is set to None here and # filled with the proper value later. if t is None: timestamp = None else: timestamp = self.get_timestamp_ms(t) for i, image in enumerate(images): observation.cameras[i].image = image observation.cameras[i].timestamp = timestamp if self._has_object_tracking: observation.object_pose = self._get_current_object_pose() observation.filtered_object_pose = self._get_current_object_pose() return observation
[docs] def get_camera_observation( self, t: int ) -> typing.Union[TriCameraObservation, TriCameraObjectObservation]: """Get camera observation at time step t. .. important:: Actual images are only rendered if the class was created with `enable_cameras=True` in the constructor, otherwise the images in the observation are set to None. Other fields are still valid, though. Args: t: The time index of the step for which the observation is requested. Only the value returned by the last call of :meth:`~append_desired_action` is valid. Returns: Observations of the three cameras. Depending of the object type used, this may also contain the object pose. Raises: ValueError: If invalid time index ``t`` is passed. """ current_t = self.simfinger._t if t < 0: raise ValueError("Cannot access time index less than zero.") elif t == current_t or t == current_t + 1: return self._camera_observation_t else: raise ValueError( "Given time index t has to match with index of the current" " step or the next one." )
[docs] def store_action_log(self, filename): """Store the action log to a JSON file. Args: filename (str): Path to the JSON file to which the log shall be written. If the file exists already, it will be overwritten. """ t = self.get_current_timeindex() camera_obs = self.get_camera_observation(t) if self._has_object_tracking: self._action_log["final_object_pose"] = { "t": t, "pose": camera_obs.object_pose, } with open(filename, "wb") as fh: pickle.dump(self._action_log, fh)