1# Copyright 2018 The Cirq Developers
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A typed time delta that supports picosecond accuracy."""
15
16from typing import AbstractSet, Any, Dict, Optional, Tuple, TYPE_CHECKING, Union
17import datetime
18
19import sympy
20
21from cirq import protocols
22from cirq._compat import proper_repr
23from cirq._doc import document
24
25if TYPE_CHECKING:
26    import cirq
27
28DURATION_LIKE = Union[None, datetime.timedelta, 'cirq.Duration']
29document(
30    DURATION_LIKE,  # type: ignore
31    """A `cirq.Duration` or value that can trivially converted to one.
32
33    A `datetime.timedelta` is a `cirq.DURATION_LIKE`. It is converted while
34    preserving its duration.
35
36    `None` is a `cirq.DURATION_LIKE` that converts into a zero-length duration.
37
38    Note that 0 is a `DURATION_LIKE`, despite the fact that `int` is not listed,
39    because 0 is the only integer where the physical unit doesn't matter.
40    """,
41)
42
43
44class Duration:
45    """A time delta that supports symbols and picosecond accuracy."""
46
47    # TODO(#3388) Add documentation for Raises.
48    # pylint: disable=missing-raises-doc
49    def __init__(
50        self,
51        value: DURATION_LIKE = None,
52        *,  # Force keyword args.
53        picos: Union[int, float, sympy.Basic] = 0,
54        nanos: Union[int, float, sympy.Basic] = 0,
55        micros: Union[int, float, sympy.Basic] = 0,
56        millis: Union[int, float, sympy.Basic] = 0,
57    ) -> None:
58        """Initializes a Duration with a time specified in some unit.
59
60        If multiple arguments are specified, their contributions are added.
61
62        Args:
63            value: A value with a pre-specified time unit. Currently only
64                supports 0 and `datetime.timedelta` instances.
65            picos: A number of picoseconds to add to the time delta.
66            nanos: A number of nanoseconds to add to the time delta.
67            micros: A number of microseconds to add to the time delta.
68            millis: A number of milliseconds to add to the time delta.
69
70        Examples:
71            >>> print(cirq.Duration(nanos=100))
72            100 ns
73            >>> print(cirq.Duration(micros=1.5 * sympy.Symbol('t')))
74            (1500.0*t) ns
75        """
76        if value is not None and value != 0:
77            if isinstance(value, datetime.timedelta):
78                # timedelta has microsecond resolution.
79                micros += int(value / datetime.timedelta(microseconds=1))
80            elif isinstance(value, Duration):
81                picos += value._picos
82            else:
83                raise TypeError(f'Not a `cirq.DURATION_LIKE`: {repr(value)}.')
84
85        self._picos: Union[float, int, sympy.Basic] = (
86            picos + nanos * 1000 + micros * 1000_000 + millis * 1000_000_000
87        )
88
89    # pylint: enable=missing-raises-doc
90    def _is_parameterized_(self) -> bool:
91        return protocols.is_parameterized(self._picos)
92
93    def _parameter_names_(self) -> AbstractSet[str]:
94        return protocols.parameter_names(self._picos)
95
96    def _resolve_parameters_(self, resolver: 'cirq.ParamResolver', recursive: bool) -> 'Duration':
97        return Duration(picos=protocols.resolve_parameters(self._picos, resolver, recursive))
98
99    def total_picos(self) -> Union[sympy.Basic, float]:
100        """Returns the number of picoseconds that the duration spans."""
101        return self._picos
102
103    def total_nanos(self) -> Union[sympy.Basic, float]:
104        """Returns the number of nanoseconds that the duration spans."""
105        return self._picos / 1000
106
107    def total_micros(self) -> Union[sympy.Basic, float]:
108        """Returns the number of microseconds that the duration spans."""
109        return self._picos / 1000_000
110
111    def total_millis(self) -> Union[sympy.Basic, float]:
112        """Returns the number of milliseconds that the duration spans."""
113        return self._picos / 1000_000_000
114
115    def __add__(self, other) -> 'Duration':
116        other = _attempt_duration_like_to_duration(other)
117        if other is None:
118            return NotImplemented
119        return Duration(picos=self._picos + other._picos)
120
121    def __radd__(self, other) -> 'Duration':
122        return self.__add__(other)
123
124    def __sub__(self, other) -> 'Duration':
125        other = _attempt_duration_like_to_duration(other)
126        if other is None:
127            return NotImplemented
128        return Duration(picos=self._picos - other._picos)
129
130    def __rsub__(self, other) -> 'Duration':
131        other = _attempt_duration_like_to_duration(other)
132        if other is None:
133            return NotImplemented
134        return Duration(picos=other._picos - self._picos)
135
136    def __mul__(self, other) -> 'Duration':
137        if not isinstance(other, (int, float, sympy.Basic)):
138            return NotImplemented
139        return Duration(picos=self._picos * other)
140
141    def __rmul__(self, other) -> 'Duration':
142        return self.__mul__(other)
143
144    def __truediv__(self, other) -> Union['Duration', float]:
145        if isinstance(other, (int, float, sympy.Basic)):
146            return Duration(picos=self._picos / other)
147
148        other_duration = _attempt_duration_like_to_duration(other)
149        if other_duration is not None:
150            return self._picos / other_duration._picos
151
152        return NotImplemented
153
154    def __eq__(self, other):
155        other = _attempt_duration_like_to_duration(other)
156        if other is None:
157            return NotImplemented
158        return self._picos == other._picos
159
160    def __ne__(self, other):
161        other = _attempt_duration_like_to_duration(other)
162        if other is None:
163            return NotImplemented
164        return self._picos != other._picos
165
166    def __gt__(self, other):
167        other = _attempt_duration_like_to_duration(other)
168        if other is None:
169            return NotImplemented
170        return self._picos > other._picos
171
172    def __lt__(self, other):
173        other = _attempt_duration_like_to_duration(other)
174        if other is None:
175            return NotImplemented
176        return self._picos < other._picos
177
178    def __ge__(self, other):
179        other = _attempt_duration_like_to_duration(other)
180        if other is None:
181            return NotImplemented
182        return self._picos >= other._picos
183
184    def __le__(self, other):
185        other = _attempt_duration_like_to_duration(other)
186        if other is None:
187            return NotImplemented
188        return self._picos <= other._picos
189
190    def __bool__(self):
191        return bool(self._picos)
192
193    def __hash__(self):
194        if isinstance(self._picos, (int, float)) and self._picos % 1000000 == 0:
195            return hash(datetime.timedelta(microseconds=self._picos / 1000000))
196        return hash((Duration, self._picos))
197
198    def _decompose_into_amount_unit_suffix(self) -> Tuple[int, str, str]:
199        if (
200            isinstance(self._picos, sympy.Mul)
201            and len(self._picos.args) == 2
202            and isinstance(self._picos.args[0], (sympy.Integer, sympy.Float))
203        ):
204            scale = self._picos.args[0]
205            rest = self._picos.args[1]
206        else:
207            scale = self._picos
208            rest = 1
209
210        if scale % 1000_000_000 == 0:
211            amount = scale / 1000_000_000
212            unit = 'millis'
213            suffix = 'ms'
214        elif scale % 1000_000 == 0:
215            amount = scale / 1000_000
216            unit = 'micros'
217            suffix = 'us'
218        elif scale % 1000 == 0:
219            amount = scale / 1000
220            unit = 'nanos'
221            suffix = 'ns'
222        else:
223            amount = scale
224            unit = 'picos'
225            suffix = 'ps'
226
227        if isinstance(scale, int):
228            amount = int(amount)
229
230        return amount * rest, unit, suffix
231
232    def __str__(self) -> str:
233        if self._picos == 0:
234            return 'Duration(0)'
235        amount, _, suffix = self._decompose_into_amount_unit_suffix()
236        if not isinstance(amount, (int, float, sympy.Symbol)):
237            amount = f'({amount})'
238        return f'{amount} {suffix}'
239
240    def __repr__(self) -> str:
241        amount, unit, _ = self._decompose_into_amount_unit_suffix()
242        return f'cirq.Duration({unit}={proper_repr(amount)})'
243
244    def _json_dict_(self) -> Dict[str, Any]:
245        return {'cirq_type': self.__class__.__name__, 'picos': self.total_picos()}
246
247
248def _attempt_duration_like_to_duration(value: Any) -> Optional[Duration]:
249    if isinstance(value, Duration):
250        return value
251    if isinstance(value, datetime.timedelta):
252        return Duration(value)
253    if isinstance(value, (int, float)) and value == 0:
254        return Duration()
255    return None
256