1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5import sys 6import time 7 8from marionette_driver import errors, wait 9from marionette_driver.wait import Wait 10 11from marionette_harness import MarionetteTestCase 12 13 14class TickingClock(object): 15 16 def __init__(self, incr=1): 17 self.ticks = 0 18 self.increment = incr 19 20 def sleep(self, dur=None): 21 dur = dur if dur is not None else self.increment 22 self.ticks += dur 23 24 @property 25 def now(self): 26 return self.ticks 27 28 29class SequenceClock(object): 30 31 def __init__(self, times): 32 self.times = times 33 self.i = 0 34 35 @property 36 def now(self): 37 if len(self.times) > self.i: 38 self.i += 1 39 return self.times[self.i - 1] 40 41 def sleep(self, dur): 42 pass 43 44 45class MockMarionette(object): 46 47 def __init__(self): 48 self.waited = 0 49 50 def exception(self, e=None, wait=1): 51 self.wait() 52 if self.waited == wait: 53 if e is None: 54 e = Exception 55 raise e 56 57 def true(self, wait=1): 58 self.wait() 59 if self.waited == wait: 60 return True 61 return None 62 63 def false(self, wait=1): 64 self.wait() 65 return False 66 67 def none(self, wait=1): 68 self.wait() 69 return None 70 71 def value(self, value, wait=1): 72 self.wait() 73 if self.waited == wait: 74 return value 75 return None 76 77 def wait(self): 78 self.waited += 1 79 80 81def at_third_attempt(clock, end): 82 return clock.now == 2 83 84 85def now(clock, end): 86 return True 87 88 89class SystemClockTest(MarionetteTestCase): 90 91 def setUp(self): 92 super(SystemClockTest, self).setUp() 93 self.clock = wait.SystemClock() 94 95 def test_construction_initializes_time(self): 96 self.assertEqual(self.clock._time, time) 97 98 def test_sleep(self): 99 start = time.time() 100 self.clock.sleep(0.1) 101 end = time.time() - start 102 self.assertGreater(end, 0) 103 104 def test_time_now(self): 105 self.assertIsNotNone(self.clock.now) 106 107 108class FormalWaitTest(MarionetteTestCase): 109 110 def setUp(self): 111 super(FormalWaitTest, self).setUp() 112 self.m = MockMarionette() 113 self.m.timeout = 123 114 115 def test_construction_with_custom_timeout(self): 116 wt = Wait(self.m, timeout=42) 117 self.assertEqual(wt.timeout, 42) 118 119 def test_construction_with_custom_interval(self): 120 wt = Wait(self.m, interval=42) 121 self.assertEqual(wt.interval, 42) 122 123 def test_construction_with_custom_clock(self): 124 c = TickingClock(1) 125 wt = Wait(self.m, clock=c) 126 self.assertEqual(wt.clock, c) 127 128 def test_construction_with_custom_exception(self): 129 wt = Wait(self.m, ignored_exceptions=Exception) 130 self.assertIn(Exception, wt.exceptions) 131 self.assertEqual(len(wt.exceptions), 1) 132 133 def test_construction_with_custom_exception_list(self): 134 exc = [Exception, ValueError] 135 wt = Wait(self.m, ignored_exceptions=exc) 136 for e in exc: 137 self.assertIn(e, wt.exceptions) 138 self.assertEqual(len(wt.exceptions), len(exc)) 139 140 def test_construction_with_custom_exception_tuple(self): 141 exc = (Exception, ValueError) 142 wt = Wait(self.m, ignored_exceptions=exc) 143 for e in exc: 144 self.assertIn(e, wt.exceptions) 145 self.assertEqual(len(wt.exceptions), len(exc)) 146 147 def test_duplicate_exceptions(self): 148 wt = Wait(self.m, ignored_exceptions=[Exception, Exception]) 149 self.assertIn(Exception, wt.exceptions) 150 self.assertEqual(len(wt.exceptions), 1) 151 152 def test_default_timeout(self): 153 self.assertEqual(wait.DEFAULT_TIMEOUT, 5) 154 155 def test_default_interval(self): 156 self.assertEqual(wait.DEFAULT_INTERVAL, 0.1) 157 158 def test_end_property(self): 159 wt = Wait(self.m) 160 self.assertIsNotNone(wt.end) 161 162 def test_marionette_property(self): 163 wt = Wait(self.m) 164 self.assertEqual(wt.marionette, self.m) 165 166 def test_clock_property(self): 167 wt = Wait(self.m) 168 self.assertIsInstance(wt.clock, wait.SystemClock) 169 170 def test_timeout_uses_default_if_marionette_timeout_is_none(self): 171 self.m.timeout = None 172 wt = Wait(self.m) 173 self.assertEqual(wt.timeout, wait.DEFAULT_TIMEOUT) 174 175 176class PredicatesTest(MarionetteTestCase): 177 178 def test_until(self): 179 c = wait.SystemClock() 180 self.assertFalse(wait.until_pred(c, sys.maxint)) 181 self.assertTrue(wait.until_pred(c, 0)) 182 183 184class WaitUntilTest(MarionetteTestCase): 185 186 def setUp(self): 187 super(WaitUntilTest, self).setUp() 188 189 self.m = MockMarionette() 190 self.clock = TickingClock() 191 self.wt = Wait(self.m, timeout=10, interval=1, clock=self.clock) 192 193 def test_true(self): 194 r = self.wt.until(lambda x: x.true()) 195 self.assertTrue(r) 196 self.assertEqual(self.clock.ticks, 0) 197 198 def test_true_within_timeout(self): 199 r = self.wt.until(lambda x: x.true(wait=5)) 200 self.assertTrue(r) 201 self.assertEqual(self.clock.ticks, 4) 202 203 def test_timeout(self): 204 with self.assertRaises(errors.TimeoutException): 205 r = self.wt.until(lambda x: x.true(wait=15)) 206 self.assertEqual(self.clock.ticks, 10) 207 208 def test_exception_raises_immediately(self): 209 with self.assertRaises(TypeError): 210 self.wt.until(lambda x: x.exception(e=TypeError)) 211 self.assertEqual(self.clock.ticks, 0) 212 213 def test_ignored_exception(self): 214 self.wt.exceptions = (TypeError,) 215 with self.assertRaises(errors.TimeoutException): 216 self.wt.until(lambda x: x.exception(e=TypeError)) 217 218 def test_ignored_exception_wrapped_in_timeoutexception(self): 219 self.wt.exceptions = (TypeError,) 220 221 exc = None 222 try: 223 self.wt.until(lambda x: x.exception(e=TypeError)) 224 except Exception as e: 225 exc = e 226 227 s = str(exc) 228 self.assertIsNotNone(exc) 229 self.assertIsInstance(exc, errors.TimeoutException) 230 self.assertIn(", caused by {0!r}".format(TypeError), s) 231 self.assertIn("self.wt.until(lambda x: x.exception(e=TypeError))", s) 232 233 def test_ignored_exception_after_timeout_is_not_raised(self): 234 with self.assertRaises(errors.TimeoutException): 235 r = self.wt.until(lambda x: x.exception(wait=15)) 236 self.assertEqual(self.clock.ticks, 10) 237 238 def test_keyboard_interrupt(self): 239 with self.assertRaises(KeyboardInterrupt): 240 self.wt.until(lambda x: x.exception(e=KeyboardInterrupt)) 241 242 def test_system_exit(self): 243 with self.assertRaises(SystemExit): 244 self.wt.until(lambda x: x.exception(SystemExit)) 245 246 def test_true_condition_returns_immediately(self): 247 r = self.wt.until(lambda x: x.true()) 248 self.assertIsInstance(r, bool) 249 self.assertTrue(r) 250 self.assertEqual(self.clock.ticks, 0) 251 252 def test_value(self): 253 r = self.wt.until(lambda x: "foo") 254 self.assertEqual(r, "foo") 255 self.assertEqual(self.clock.ticks, 0) 256 257 def test_custom_predicate(self): 258 r = self.wt.until(lambda x: x.true(wait=2), is_true=at_third_attempt) 259 self.assertTrue(r) 260 self.assertEqual(self.clock.ticks, 1) 261 262 def test_custom_predicate_times_out(self): 263 with self.assertRaises(errors.TimeoutException): 264 self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt) 265 266 self.assertEqual(self.clock.ticks, 2) 267 268 def test_timeout_elapsed_duration(self): 269 with self.assertRaisesRegexp(errors.TimeoutException, 270 "Timed out after 2.0 seconds"): 271 self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt) 272 273 def test_timeout_elapsed_rounding(self): 274 wt = Wait(self.m, clock=SequenceClock([1, 0.01, 1]), timeout=0) 275 with self.assertRaisesRegexp(errors.TimeoutException, 276 "Timed out after 1.0 seconds"): 277 wt.until(lambda x: x.true(), is_true=now) 278 279 def test_timeout_elapsed_interval_by_delayed_condition_return(self): 280 def callback(mn): 281 self.clock.sleep(11) 282 return mn.false() 283 284 with self.assertRaisesRegexp(errors.TimeoutException, 285 "Timed out after 11.0 seconds"): 286 self.wt.until(callback) 287 # With a delayed conditional return > timeout, only 1 iteration is 288 # possible 289 self.assertEqual(self.m.waited, 1) 290 291 def test_timeout_with_delayed_condition_return(self): 292 def callback(mn): 293 self.clock.sleep(.5) 294 return mn.false() 295 296 with self.assertRaisesRegexp(errors.TimeoutException, 297 "Timed out after 10.0 seconds"): 298 self.wt.until(callback) 299 # With a delayed conditional return < interval, 10 iterations should be 300 # possible 301 self.assertEqual(self.m.waited, 10) 302 303 def test_timeout_interval_shorter_than_delayed_condition_return(self): 304 def callback(mn): 305 self.clock.sleep(2) 306 return mn.false() 307 308 with self.assertRaisesRegexp(errors.TimeoutException, 309 "Timed out after 10.0 seconds"): 310 self.wt.until(callback) 311 # With a delayed return of the conditional which takes twice that long than the interval, 312 # half of the iterations should be possible 313 self.assertEqual(self.m.waited, 5) 314 315 def test_message(self): 316 self.wt.exceptions = (TypeError,) 317 exc = None 318 try: 319 self.wt.until(lambda x: x.exception(e=TypeError), message="hooba") 320 except errors.TimeoutException as e: 321 exc = e 322 323 result = str(exc) 324 self.assertIn("seconds with message: hooba, caused by", result) 325 326 def test_no_message(self): 327 self.wt.exceptions = (TypeError,) 328 exc = None 329 try: 330 self.wt.until(lambda x: x.exception(e=TypeError), message="") 331 except errors.TimeoutException as e: 332 exc = e 333 334 result = str(exc) 335 self.assertIn("seconds, caused by", result) 336 337 def test_message_has_none_as_its_value(self): 338 self.wt.exceptions = (TypeError,) 339 exc = None 340 try: 341 self.wt.until(False, None, None) 342 except errors.TimeoutException as e: 343 exc = e 344 345 result = str(exc) 346 self.assertNotIn("with message:", result) 347 self.assertNotIn("secondsNone", result) 348