1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5import sys
6import time
7
8from marionette_driver import errors, wait
9from marionette_driver.wait import Wait
10
11from marionette_harness import MarionetteTestCase
12
13
14class TickingClock(object):
15
16    def __init__(self, incr=1):
17        self.ticks = 0
18        self.increment = incr
19
20    def sleep(self, dur=None):
21        dur = dur if dur is not None else self.increment
22        self.ticks += dur
23
24    @property
25    def now(self):
26        return self.ticks
27
28
29class SequenceClock(object):
30
31    def __init__(self, times):
32        self.times = times
33        self.i = 0
34
35    @property
36    def now(self):
37        if len(self.times) > self.i:
38            self.i += 1
39        return self.times[self.i - 1]
40
41    def sleep(self, dur):
42        pass
43
44
45class MockMarionette(object):
46
47    def __init__(self):
48        self.waited = 0
49
50    def exception(self, e=None, wait=1):
51        self.wait()
52        if self.waited == wait:
53            if e is None:
54                e = Exception
55            raise e
56
57    def true(self, wait=1):
58        self.wait()
59        if self.waited == wait:
60            return True
61        return None
62
63    def false(self, wait=1):
64        self.wait()
65        return False
66
67    def none(self, wait=1):
68        self.wait()
69        return None
70
71    def value(self, value, wait=1):
72        self.wait()
73        if self.waited == wait:
74            return value
75        return None
76
77    def wait(self):
78        self.waited += 1
79
80
81def at_third_attempt(clock, end):
82    return clock.now == 2
83
84
85def now(clock, end):
86    return True
87
88
89class SystemClockTest(MarionetteTestCase):
90
91    def setUp(self):
92        super(SystemClockTest, self).setUp()
93        self.clock = wait.SystemClock()
94
95    def test_construction_initializes_time(self):
96        self.assertEqual(self.clock._time, time)
97
98    def test_sleep(self):
99        start = time.time()
100        self.clock.sleep(0.1)
101        end = time.time() - start
102        self.assertGreater(end, 0)
103
104    def test_time_now(self):
105        self.assertIsNotNone(self.clock.now)
106
107
108class FormalWaitTest(MarionetteTestCase):
109
110    def setUp(self):
111        super(FormalWaitTest, self).setUp()
112        self.m = MockMarionette()
113        self.m.timeout = 123
114
115    def test_construction_with_custom_timeout(self):
116        wt = Wait(self.m, timeout=42)
117        self.assertEqual(wt.timeout, 42)
118
119    def test_construction_with_custom_interval(self):
120        wt = Wait(self.m, interval=42)
121        self.assertEqual(wt.interval, 42)
122
123    def test_construction_with_custom_clock(self):
124        c = TickingClock(1)
125        wt = Wait(self.m, clock=c)
126        self.assertEqual(wt.clock, c)
127
128    def test_construction_with_custom_exception(self):
129        wt = Wait(self.m, ignored_exceptions=Exception)
130        self.assertIn(Exception, wt.exceptions)
131        self.assertEqual(len(wt.exceptions), 1)
132
133    def test_construction_with_custom_exception_list(self):
134        exc = [Exception, ValueError]
135        wt = Wait(self.m, ignored_exceptions=exc)
136        for e in exc:
137            self.assertIn(e, wt.exceptions)
138        self.assertEqual(len(wt.exceptions), len(exc))
139
140    def test_construction_with_custom_exception_tuple(self):
141        exc = (Exception, ValueError)
142        wt = Wait(self.m, ignored_exceptions=exc)
143        for e in exc:
144            self.assertIn(e, wt.exceptions)
145        self.assertEqual(len(wt.exceptions), len(exc))
146
147    def test_duplicate_exceptions(self):
148        wt = Wait(self.m, ignored_exceptions=[Exception, Exception])
149        self.assertIn(Exception, wt.exceptions)
150        self.assertEqual(len(wt.exceptions), 1)
151
152    def test_default_timeout(self):
153        self.assertEqual(wait.DEFAULT_TIMEOUT, 5)
154
155    def test_default_interval(self):
156        self.assertEqual(wait.DEFAULT_INTERVAL, 0.1)
157
158    def test_end_property(self):
159        wt = Wait(self.m)
160        self.assertIsNotNone(wt.end)
161
162    def test_marionette_property(self):
163        wt = Wait(self.m)
164        self.assertEqual(wt.marionette, self.m)
165
166    def test_clock_property(self):
167        wt = Wait(self.m)
168        self.assertIsInstance(wt.clock, wait.SystemClock)
169
170    def test_timeout_uses_default_if_marionette_timeout_is_none(self):
171        self.m.timeout = None
172        wt = Wait(self.m)
173        self.assertEqual(wt.timeout, wait.DEFAULT_TIMEOUT)
174
175
176class PredicatesTest(MarionetteTestCase):
177
178    def test_until(self):
179        c = wait.SystemClock()
180        self.assertFalse(wait.until_pred(c, sys.maxint))
181        self.assertTrue(wait.until_pred(c, 0))
182
183
184class WaitUntilTest(MarionetteTestCase):
185
186    def setUp(self):
187        super(WaitUntilTest, self).setUp()
188
189        self.m = MockMarionette()
190        self.clock = TickingClock()
191        self.wt = Wait(self.m, timeout=10, interval=1, clock=self.clock)
192
193    def test_true(self):
194        r = self.wt.until(lambda x: x.true())
195        self.assertTrue(r)
196        self.assertEqual(self.clock.ticks, 0)
197
198    def test_true_within_timeout(self):
199        r = self.wt.until(lambda x: x.true(wait=5))
200        self.assertTrue(r)
201        self.assertEqual(self.clock.ticks, 4)
202
203    def test_timeout(self):
204        with self.assertRaises(errors.TimeoutException):
205            r = self.wt.until(lambda x: x.true(wait=15))
206        self.assertEqual(self.clock.ticks, 10)
207
208    def test_exception_raises_immediately(self):
209        with self.assertRaises(TypeError):
210            self.wt.until(lambda x: x.exception(e=TypeError))
211        self.assertEqual(self.clock.ticks, 0)
212
213    def test_ignored_exception(self):
214        self.wt.exceptions = (TypeError,)
215        with self.assertRaises(errors.TimeoutException):
216            self.wt.until(lambda x: x.exception(e=TypeError))
217
218    def test_ignored_exception_wrapped_in_timeoutexception(self):
219        self.wt.exceptions = (TypeError,)
220
221        exc = None
222        try:
223            self.wt.until(lambda x: x.exception(e=TypeError))
224        except Exception as e:
225            exc = e
226
227        s = str(exc)
228        self.assertIsNotNone(exc)
229        self.assertIsInstance(exc, errors.TimeoutException)
230        self.assertIn(", caused by {0!r}".format(TypeError), s)
231        self.assertIn("self.wt.until(lambda x: x.exception(e=TypeError))", s)
232
233    def test_ignored_exception_after_timeout_is_not_raised(self):
234        with self.assertRaises(errors.TimeoutException):
235            r = self.wt.until(lambda x: x.exception(wait=15))
236        self.assertEqual(self.clock.ticks, 10)
237
238    def test_keyboard_interrupt(self):
239        with self.assertRaises(KeyboardInterrupt):
240            self.wt.until(lambda x: x.exception(e=KeyboardInterrupt))
241
242    def test_system_exit(self):
243        with self.assertRaises(SystemExit):
244            self.wt.until(lambda x: x.exception(SystemExit))
245
246    def test_true_condition_returns_immediately(self):
247        r = self.wt.until(lambda x: x.true())
248        self.assertIsInstance(r, bool)
249        self.assertTrue(r)
250        self.assertEqual(self.clock.ticks, 0)
251
252    def test_value(self):
253        r = self.wt.until(lambda x: "foo")
254        self.assertEqual(r, "foo")
255        self.assertEqual(self.clock.ticks, 0)
256
257    def test_custom_predicate(self):
258        r = self.wt.until(lambda x: x.true(wait=2), is_true=at_third_attempt)
259        self.assertTrue(r)
260        self.assertEqual(self.clock.ticks, 1)
261
262    def test_custom_predicate_times_out(self):
263        with self.assertRaises(errors.TimeoutException):
264            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
265
266        self.assertEqual(self.clock.ticks, 2)
267
268    def test_timeout_elapsed_duration(self):
269        with self.assertRaisesRegexp(errors.TimeoutException,
270                                     "Timed out after 2.0 seconds"):
271            self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
272
273    def test_timeout_elapsed_rounding(self):
274        wt = Wait(self.m, clock=SequenceClock([1, 0.01, 1]), timeout=0)
275        with self.assertRaisesRegexp(errors.TimeoutException,
276                                     "Timed out after 1.0 seconds"):
277            wt.until(lambda x: x.true(), is_true=now)
278
279    def test_timeout_elapsed_interval_by_delayed_condition_return(self):
280        def callback(mn):
281            self.clock.sleep(11)
282            return mn.false()
283
284        with self.assertRaisesRegexp(errors.TimeoutException,
285                                     "Timed out after 11.0 seconds"):
286            self.wt.until(callback)
287        # With a delayed conditional return > timeout, only 1 iteration is
288        # possible
289        self.assertEqual(self.m.waited, 1)
290
291    def test_timeout_with_delayed_condition_return(self):
292        def callback(mn):
293            self.clock.sleep(.5)
294            return mn.false()
295
296        with self.assertRaisesRegexp(errors.TimeoutException,
297                                     "Timed out after 10.0 seconds"):
298            self.wt.until(callback)
299        # With a delayed conditional return < interval, 10 iterations should be
300        # possible
301        self.assertEqual(self.m.waited, 10)
302
303    def test_timeout_interval_shorter_than_delayed_condition_return(self):
304        def callback(mn):
305            self.clock.sleep(2)
306            return mn.false()
307
308        with self.assertRaisesRegexp(errors.TimeoutException,
309                                     "Timed out after 10.0 seconds"):
310            self.wt.until(callback)
311        # With a delayed return of the conditional which takes twice that long than the interval,
312        # half of the iterations should be possible
313        self.assertEqual(self.m.waited, 5)
314
315    def test_message(self):
316        self.wt.exceptions = (TypeError,)
317        exc = None
318        try:
319            self.wt.until(lambda x: x.exception(e=TypeError), message="hooba")
320        except errors.TimeoutException as e:
321            exc = e
322
323        result = str(exc)
324        self.assertIn("seconds with message: hooba, caused by", result)
325
326    def test_no_message(self):
327        self.wt.exceptions = (TypeError,)
328        exc = None
329        try:
330            self.wt.until(lambda x: x.exception(e=TypeError), message="")
331        except errors.TimeoutException as e:
332            exc = e
333
334        result = str(exc)
335        self.assertIn("seconds, caused by", result)
336
337    def test_message_has_none_as_its_value(self):
338        self.wt.exceptions = (TypeError,)
339        exc = None
340        try:
341            self.wt.until(False, None, None)
342        except errors.TimeoutException as e:
343            exc = e
344
345        result = str(exc)
346        self.assertNotIn("with message:", result)
347        self.assertNotIn("secondsNone", result)
348