1# Lint as: python3
2"""Quadruped toe position sensor."""
3
4from typing import Any, Callable, Sequence, Text, Tuple, Type, Union
5
6import gin
7import gym
8import numpy as np
9
10from pybullet_envs.minitaur.envs_v2.sensors import sensor
11from pybullet_envs.minitaur.envs_v2.utilities import noise_generators
12
13
14def _convert_to_np_array(inputs: Union[float, Tuple[float], np.ndarray], dim):
15  """Converts the inputs to a numpy array.
16
17  Args:
18    inputs: The input scalar or array.
19    dim: The dimension of the converted numpy array.
20
21  Returns:
22    The converted numpy array.
23
24  Raises:
25    ValueError: If the inputs is an array whose dimension does not match the
26    provided dimension.
27  """
28  outputs = None
29  if isinstance(inputs, (tuple, np.ndarray)):
30    outputs = np.array(inputs)
31  else:
32    outputs = np.full(dim, inputs)
33
34  if len(outputs) != dim:
35    raise ValueError("The inputs array has a different dimension {}"
36                     " than provided, which is {}.".format(len(outputs), dim))
37
38  return outputs
39
40
41@gin.configurable
42class ToePositionSensor(sensor.Sensor):
43  """A sensor that outputs the toe positions of attached robots or objects."""
44
45  def __init__(
46      self,
47      name: Text = "toe_position",
48      dtype: Type[Any] = np.float64,
49      lower_bound: Union[float, Sequence[float]] = -1.0,
50      upper_bound: Union[float, Sequence[float]] = 1.0,
51      noise_generator: Union[Callable[..., Any],
52                             noise_generators.NoiseGenerator] = None,
53      sensor_latency: Union[float, Sequence[float]] = 0.0,
54  ):
55    """Constructor.
56
57    Args:
58      name: Name of the sensor.
59      dtype: Data type of sensor value.
60      lower_bound: The optional lower bounds of the sensor reading.
61      upper_bound: The optional upper bounds of the sensor reading.
62      noise_generator: Used to add noise to the readings.
63      sensor_latency: Single or multiple latency in seconds. See sensor.Sensor
64        docstring for details.
65    """
66    super().__init__(
67        name=name,
68        sensor_latency=sensor_latency,
69        interpolator_fn=sensor.linear_obs_blender)
70    self._dtype = dtype
71    self._lower_bound = lower_bound
72    self._upper_bound = upper_bound
73    self._noise_generator = noise_generator
74
75  def set_robot(self, robot):
76    self._robot = robot
77    num_legs = len(robot.urdf_loader.get_end_effector_id_dict().values())
78    lower_bound = _convert_to_np_array(self._lower_bound, num_legs * 3)
79
80    upper_bound = _convert_to_np_array(self._upper_bound, num_legs * 3)
81
82    self._observation_space = self._stack_space(
83        gym.spaces.Box(low=lower_bound, high=upper_bound, dtype=self._dtype))
84
85  def _get_original_observation(self) -> Tuple[float, np.ndarray]:
86    """Returns raw observation with timestamp."""
87    toe_position = np.array(
88        self._robot.foot_positions(), dtype=self._dtype).flatten()
89
90    return self._robot.timestamp, toe_position
91
92  def get_observation(self) -> np.ndarray:
93    delayed_observation = super().get_observation()
94    if self._noise_generator:
95      if callable(self._noise_generator):
96        return self._noise_generator(delayed_observation)
97      else:
98        return self._noise_generator.add_noise(delayed_observation)
99
100    return delayed_observation
101