1# Lint as: python3
2"""The on robot sensor classes."""
3
4from typing import Any, Callable, Iterable, Optional, Sequence, Type, Text, Tuple, Union
5
6import gin
7import gym
8import numpy as np
9
10from pybullet_envs.minitaur.envs_v2.sensors import sensor
11
12from pybullet_envs.minitaur.envs_v2.utilities import noise_generators
13
14
15def _convert_to_np_array(inputs: Union[float, Tuple[float], np.ndarray], dim):
16  """Converts the inputs to a numpy array.
17
18  Args:
19    inputs: The input scalar or array.
20    dim: The dimension of the converted numpy array.
21
22  Returns:
23    The converted numpy array.
24
25  Raises:
26    ValueError: If the inputs is an array whose dimension does not match the
27    provided dimension.
28  """
29  outputs = None
30  if isinstance(inputs, (tuple, np.ndarray)):
31    outputs = np.array(inputs)
32  else:
33    outputs = np.full(dim, inputs)
34
35  if len(outputs) != dim:
36    raise ValueError("The inputs array has a different dimension {}"
37                     " than provided, which is {}.".format(len(outputs), dim))
38
39  return outputs
40
41
42@gin.configurable
43class MotorAngleSensor(sensor.Sensor):
44  """A sensor that reads motor angles from the robot."""
45
46  def __init__(self,
47               name: Text = "MotorAngle",
48               dtype: Type[Any] = np.float64,
49               lower_bound: Optional[Union[float, Iterable[float]]] = None,
50               upper_bound: Optional[Union[float, Iterable[float]]] = None,
51               noise_generator: Union[Callable[..., Any],
52                                      noise_generators.NoiseGenerator] = None,
53               sensor_latency: Union[float, Sequence[float]] = 0.0,
54               observe_sine_cosine: bool = False):
55    """Initializes the class.
56
57    Args:
58      name: The name of the sensor.
59      dtype: The datatype of this sensor.
60      lower_bound: The optional lower bounds of the sensor reading. If not
61        provided, will extract from the motor limits of the robot class.
62      upper_bound: The optional upper bounds of the sensor reading. If not
63        provided, will extract from the motor limits of the robot class.
64      noise_generator: Adds noise to the sensor readings.
65      sensor_latency: There are two ways to use this expected sensor latency.
66        For both methods, the latency should be in the same unit as the sensor
67        data timestamp. 1. As a single float number, the observation will be a
68        1D array. For real robots, this should be set to 0.0. 2. As a array of
69        floats, the observation will be a 2D array based on how long the history
70        need to be. Thus, [0.0, 0.1, 0.2] is a history length of 3.
71       observe_sine_cosine: whether to observe motor angles as sine and cosine
72         values.
73    """
74    super().__init__(
75        name=name,
76        sensor_latency=sensor_latency,
77        interpolator_fn=sensor.linear_obs_blender)
78    self._noise_generator = noise_generator
79    self._dtype = dtype
80    self._lower_bound = lower_bound
81    self._upper_bound = upper_bound
82    self._observe_sine_cosine = observe_sine_cosine
83
84  def set_robot(self, robot):
85    self._robot = robot
86    # Creates the observation space based on the robot motor limitations.
87    if self._observe_sine_cosine:
88      lower_bound = _convert_to_np_array(-1, 2 * self._robot.num_motors)
89    elif self._lower_bound:
90      lower_bound = _convert_to_np_array(self._lower_bound,
91                                         self._robot.num_motors)
92    else:
93      lower_bound = _convert_to_np_array(
94          self._robot.motor_limits.angle_lower_limits, self._robot.num_motors)
95    if self._observe_sine_cosine:
96      upper_bound = _convert_to_np_array(1, 2 * self._robot.num_motors)
97    elif self._upper_bound:
98      upper_bound = _convert_to_np_array(self._upper_bound,
99                                         self._robot.num_motors)
100    else:
101      upper_bound = _convert_to_np_array(
102          self._robot.motor_limits.angle_upper_limits, self._robot.num_motors)
103
104    self._observation_space = self._stack_space(
105        gym.spaces.Box(low=lower_bound, high=upper_bound, dtype=self._dtype))
106
107  def _get_original_observation(self):
108    if self._observe_sine_cosine:
109      return self._robot.timestamp, np.hstack(
110          (np.cos(self._robot.motor_angles), np.sin(self._robot.motor_angles)))
111    else:
112      return self._robot.timestamp, self._robot.motor_angles
113
114  def get_observation(self) -> np.ndarray:
115    delayed_observation = super().get_observation()
116    if self._noise_generator:
117      if callable(self._noise_generator):
118        return self._noise_generator(delayed_observation)
119      else:
120        return self._noise_generator.add_noise(delayed_observation)
121
122    return delayed_observation
123