1"""Test errors.py."""
2
3import io
4import math
5import tempfile
6import time
7
8from pytype import metrics
9
10import unittest
11
12
13class MetricsTest(unittest.TestCase):
14  """Tests for metrics infrastructure and the Counter class."""
15
16  def setUp(self):
17    super().setUp()
18    metrics._prepare_for_test()
19
20  def test_name_collision(self):
21    metrics.Counter("foo")
22    self.assertRaises(ValueError, metrics.Counter, "foo")
23
24  def test_valid_name(self):
25    # Metrics allows the same names as python identifiers.
26    metrics.Counter("abc")
27    metrics.Counter("a_b_c")
28    metrics.Counter("abc1")
29    metrics.Counter("_abc")
30
31  def test_invalid_name(self):
32    self.assertRaises(ValueError, metrics.Counter, "")
33    self.assertRaises(ValueError, metrics.Counter, "a-b")
34    self.assertRaises(ValueError, metrics.Counter, "a b")
35    self.assertRaises(ValueError, metrics.Counter, "123")
36
37  def test_get_report(self):
38    metrics.Counter("foo").inc(2)
39    metrics.Counter("bar").inc(123)
40    self.assertEqual("bar: 123\nfoo: 2\n", metrics.get_report())
41
42  def test_counter(self):
43    c = metrics.Counter("foo")
44    self.assertEqual(0, c._total)
45    self.assertEqual("foo: 0", str(c))
46    c.inc()
47    self.assertEqual(1, c._total)
48    c.inc(6)
49    self.assertEqual(7, c._total)
50    c.inc(0)
51    self.assertEqual(7, c._total)
52    self.assertEqual("foo: 7", str(c))
53    self.assertRaises(ValueError, c.inc, -1)
54
55  def test_counter_disabled(self):
56    metrics._prepare_for_test(enabled=False)
57    c = metrics.Counter("foo")
58    c.inc()
59    self.assertEqual(0, c._total)
60
61  def test_merge_from(self):
62    # Create a counter, increment it, and dump it.
63    c1 = metrics.Counter("foo")
64    c1.inc(1)
65    dump = io.StringIO("")
66    metrics.dump_all([c1], dump)
67    # Reset metrics, merge from dump, which will create a new metric.
68    metrics._prepare_for_test()
69    self.assertFalse(metrics._registered_metrics)
70    dump.seek(0)
71    metrics.merge_from_file(dump)
72    m = metrics._registered_metrics["foo"]
73    self.assertEqual(1, m._total)
74    # Merge again, this time it will merge data into the existing metric.
75    dump.seek(0)
76    metrics.merge_from_file(dump)
77    self.assertEqual(2, m._total)
78    # It's an error to merge an incompatible type.
79    metrics._prepare_for_test()
80    _ = metrics.MapCounter("foo")
81    dump.seek(0)
82    self.assertRaises(TypeError, metrics.merge_from_file, dump)
83
84  def test_get_metric(self):
85    c1 = metrics.get_metric("foo", metrics.Counter)
86    self.assertIsInstance(c1, metrics.Counter)
87    c2 = metrics.get_metric("foo", metrics.Counter)
88    self.assertIs(c1, c2)
89
90
91class ReentrantStopWatchTest(unittest.TestCase):
92
93  def test_reentrant_stop_watch(self):
94    c = metrics.ReentrantStopWatch("watch1")
95    with c:
96      with c:
97        time.sleep(0.01)
98    self.assertGreater(c._time, 0)
99
100  def test_reentrant_stop_watch_merge(self):
101    c = metrics.ReentrantStopWatch("watch2")
102    d = metrics.ReentrantStopWatch("watch3")
103    with c:
104      with c:
105        time.sleep(0.0025)
106
107    with d:
108      with d:
109        time.sleep(0.0001)
110
111    t1 = c._time
112    t2 = d._time
113    d._merge(c)
114    self.assertLess(abs(t1+t2-d._time), 0.000001)
115
116
117class StopWatchTest(unittest.TestCase):
118  """Tests for StopWatch."""
119
120  def setUp(self):
121    super().setUp()
122    metrics._prepare_for_test()
123
124  def test_stopwatch(self):
125    c = metrics.StopWatch("foo")
126    with c:
127      pass
128    self.assertGreaterEqual(c._total, 0)
129
130  def test_merge(self):
131    c1 = metrics.StopWatch("foo")
132    c2 = metrics.StopWatch("bar")
133    with c1:
134      pass
135    with c2:
136      pass
137    t1 = c1._total
138    t2 = c2._total
139    c1._merge(c2)
140    t3 = c1._total
141    self.assertGreaterEqual(t3, t1)
142    self.assertGreaterEqual(t3, t2)
143
144  def test_summary(self):
145    c1 = metrics.StopWatch("foo")
146    with c1:
147      pass
148    self.assertIsInstance(c1._summary(), str)
149    self.assertIsInstance(str(c1), str)
150
151
152class MapCounterTest(unittest.TestCase):
153  """Tests for MapCounter."""
154
155  def setUp(self):
156    super().setUp()
157    metrics._prepare_for_test()
158
159  def test_enabled(self):
160    c = metrics.MapCounter("foo")
161    # Check contents of an empty map.
162    self.assertEqual(0, c._total)
163    # Increment a few values and check again.
164    c.inc("x")
165    c.inc("y", 2)
166    c.inc("x", 5)
167    self.assertEqual(8, c._total)
168    self.assertDictEqual(dict(x=6, y=2), c._counts)
169    self.assertEqual("foo: 8 {x=6, y=2}", str(c))
170
171  def test_disabled(self):
172    metrics._prepare_for_test(enabled=False)
173    c = metrics.MapCounter("foo")
174    c.inc("x")
175    self.assertEqual(0, c._total)
176
177  def test_merge(self):
178    c = metrics.MapCounter("foo")
179    c.inc("x")
180    c.inc("y", 2)
181    # Cheat a little by merging a counter with a different name.
182    other = metrics.MapCounter("other")
183    other.inc("x")
184    other.inc("z")
185    c._merge(other)
186    # Check merged contents.
187    self.assertEqual(5, c._total)
188    self.assertDictEqual(dict(x=2, y=2, z=1), c._counts)
189
190
191class DistributionTest(unittest.TestCase):
192  """Tests for Distribution."""
193
194  def setUp(self):
195    super().setUp()
196    metrics._prepare_for_test()
197
198  def test_accumulation(self):
199    d = metrics.Distribution("foo")
200    # Check contents of an empty distribution.
201    self.assertEqual(0, d._count)
202    self.assertEqual(0, d._total)
203    self.assertIsNone(d._min)
204    self.assertIsNone(d._max)
205    self.assertIsNone(d._mean())
206    self.assertIsNone(d._stdev())
207    # Add some values.
208    d.add(3)
209    d.add(2)
210    d.add(5)
211    # Check the final contents.
212    self.assertEqual(3, d._count)
213    self.assertEqual(10, d._total)
214    self.assertEqual(2, d._min)
215    self.assertEqual(5, d._max)
216    self.assertAlmostEqual(10.0 / 3, d._mean())
217    # Stddev should be sqrt(14/9).
218    self.assertAlmostEqual(math.sqrt(14.0 / 9), d._stdev())
219
220  def test_summary(self):
221    d = metrics.Distribution("foo")
222    self.assertEqual(
223        "foo: total=0.0, count=0, min=None, max=None, mean=None, stdev=None",
224        str(d))
225    # This test is delicate because it is checking the string output of
226    # floating point calculations.  This specific data set was chosen because
227    # the number of samples is a power of two (thus the division is exact) and
228    # the variance is a natural square (thus the sqrt() is exact).
229    d.add(1)
230    d.add(5)
231    self.assertEqual(
232        "foo: total=6.0, count=2, min=1, max=5, mean=3.0, stdev=2.0",
233        str(d))
234
235  def test_disabled(self):
236    metrics._prepare_for_test(enabled=False)
237    d = metrics.Distribution("foo")
238    d.add(123)
239    self.assertEqual(0, d._count)
240
241  def test_merge(self):
242    d = metrics.Distribution("foo")
243    # Merge two empty metrics together.
244    other = metrics.Distribution("d_empty")
245    d._merge(other)
246    self.assertEqual(0, d._count)
247    self.assertEqual(0, d._total)
248    self.assertEqual(0, d._squared)
249    self.assertEqual(None, d._min)
250    self.assertEqual(None, d._max)
251    # Merge into an empty metric (verifies the case where min/max must be
252    # copied directly from the merged metric).
253    other = metrics.Distribution("d2")
254    other.add(10)
255    other.add(20)
256    d._merge(other)
257    self.assertEqual(2, d._count)
258    self.assertEqual(30, d._total)
259    self.assertEqual(500, d._squared)
260    self.assertEqual(10, d._min)
261    self.assertEqual(20, d._max)
262    # Merge into an existing metric resulting in a new min.
263    other = metrics.Distribution("d3")
264    other.add(5)
265    d._merge(other)
266    self.assertEqual(3, d._count)
267    self.assertEqual(35, d._total)
268    self.assertEqual(525, d._squared)
269    self.assertEqual(5, d._min)
270    self.assertEqual(20, d._max)
271    # Merge into an existing metric resulting in a new max.
272    other = metrics.Distribution("d4")
273    other.add(30)
274    d._merge(other)
275    self.assertEqual(4, d._count)
276    self.assertEqual(65, d._total)
277    self.assertEqual(1425, d._squared)
278    self.assertEqual(5, d._min)
279    self.assertEqual(30, d._max)
280    # Merge an empty metric (slopppy min/max code would fail).
281    other = metrics.Distribution("d5")
282    d._merge(other)
283    self.assertEqual(4, d._count)
284    self.assertEqual(65, d._total)
285    self.assertEqual(1425, d._squared)
286    self.assertEqual(5, d._min)
287    self.assertEqual(30, d._max)
288
289
290class MetricsContextTest(unittest.TestCase):
291  """Tests for MetricsContext."""
292
293  def setUp(self):
294    super().setUp()
295    metrics._prepare_for_test(False)
296    self._counter = metrics.Counter("foo")
297
298  def test_enabled(self):
299    with tempfile.NamedTemporaryFile() as out:
300      out.close()
301      with metrics.MetricsContext(out.name):
302        self._counter.inc()
303      self.assertEqual(1, self._counter._total)
304      with open(out.name) as f:
305        dumped = metrics.load_all(f)
306        self.assertEqual(len(dumped), 1)
307        self.assertEqual("foo", dumped[0].name)
308        self.assertEqual("foo: 1", str(dumped[0]))
309
310  def test_disabled(self):
311    with metrics.MetricsContext(""):
312      self._counter.inc()
313    self.assertEqual(0, self._counter._total)
314
315
316if __name__ == "__main__":
317  unittest.main()
318