1# Lint as: python3 2"""The on robot sensor classes.""" 3 4from typing import Any, Callable, Iterable, Optional, Sequence, Type, Text, Tuple, Union 5 6import gin 7import gym 8import numpy as np 9 10from pybullet_envs.minitaur.envs_v2.sensors import sensor 11 12from pybullet_envs.minitaur.envs_v2.utilities import noise_generators 13 14 15def _convert_to_np_array(inputs: Union[float, Tuple[float], np.ndarray], dim): 16 """Converts the inputs to a numpy array. 17 18 Args: 19 inputs: The input scalar or array. 20 dim: The dimension of the converted numpy array. 21 22 Returns: 23 The converted numpy array. 24 25 Raises: 26 ValueError: If the inputs is an array whose dimension does not match the 27 provided dimension. 28 """ 29 outputs = None 30 if isinstance(inputs, (tuple, np.ndarray)): 31 outputs = np.array(inputs) 32 else: 33 outputs = np.full(dim, inputs) 34 35 if len(outputs) != dim: 36 raise ValueError("The inputs array has a different dimension {}" 37 " than provided, which is {}.".format(len(outputs), dim)) 38 39 return outputs 40 41 42@gin.configurable 43class MotorAngleSensor(sensor.Sensor): 44 """A sensor that reads motor angles from the robot.""" 45 46 def __init__(self, 47 name: Text = "MotorAngle", 48 dtype: Type[Any] = np.float64, 49 lower_bound: Optional[Union[float, Iterable[float]]] = None, 50 upper_bound: Optional[Union[float, Iterable[float]]] = None, 51 noise_generator: Union[Callable[..., Any], 52 noise_generators.NoiseGenerator] = None, 53 sensor_latency: Union[float, Sequence[float]] = 0.0, 54 observe_sine_cosine: bool = False): 55 """Initializes the class. 56 57 Args: 58 name: The name of the sensor. 59 dtype: The datatype of this sensor. 60 lower_bound: The optional lower bounds of the sensor reading. If not 61 provided, will extract from the motor limits of the robot class. 62 upper_bound: The optional upper bounds of the sensor reading. If not 63 provided, will extract from the motor limits of the robot class. 64 noise_generator: Adds noise to the sensor readings. 65 sensor_latency: There are two ways to use this expected sensor latency. 66 For both methods, the latency should be in the same unit as the sensor 67 data timestamp. 1. As a single float number, the observation will be a 68 1D array. For real robots, this should be set to 0.0. 2. As a array of 69 floats, the observation will be a 2D array based on how long the history 70 need to be. Thus, [0.0, 0.1, 0.2] is a history length of 3. 71 observe_sine_cosine: whether to observe motor angles as sine and cosine 72 values. 73 """ 74 super().__init__( 75 name=name, 76 sensor_latency=sensor_latency, 77 interpolator_fn=sensor.linear_obs_blender) 78 self._noise_generator = noise_generator 79 self._dtype = dtype 80 self._lower_bound = lower_bound 81 self._upper_bound = upper_bound 82 self._observe_sine_cosine = observe_sine_cosine 83 84 def set_robot(self, robot): 85 self._robot = robot 86 # Creates the observation space based on the robot motor limitations. 87 if self._observe_sine_cosine: 88 lower_bound = _convert_to_np_array(-1, 2 * self._robot.num_motors) 89 elif self._lower_bound: 90 lower_bound = _convert_to_np_array(self._lower_bound, 91 self._robot.num_motors) 92 else: 93 lower_bound = _convert_to_np_array( 94 self._robot.motor_limits.angle_lower_limits, self._robot.num_motors) 95 if self._observe_sine_cosine: 96 upper_bound = _convert_to_np_array(1, 2 * self._robot.num_motors) 97 elif self._upper_bound: 98 upper_bound = _convert_to_np_array(self._upper_bound, 99 self._robot.num_motors) 100 else: 101 upper_bound = _convert_to_np_array( 102 self._robot.motor_limits.angle_upper_limits, self._robot.num_motors) 103 104 self._observation_space = self._stack_space( 105 gym.spaces.Box(low=lower_bound, high=upper_bound, dtype=self._dtype)) 106 107 def _get_original_observation(self): 108 if self._observe_sine_cosine: 109 return self._robot.timestamp, np.hstack( 110 (np.cos(self._robot.motor_angles), np.sin(self._robot.motor_angles))) 111 else: 112 return self._robot.timestamp, self._robot.motor_angles 113 114 def get_observation(self) -> np.ndarray: 115 delayed_observation = super().get_observation() 116 if self._noise_generator: 117 if callable(self._noise_generator): 118 return self._noise_generator(delayed_observation) 119 else: 120 return self._noise_generator.add_noise(delayed_observation) 121 122 return delayed_observation 123