1import logging 2import pickle 3from datetime import datetime, timedelta 4from threading import Thread 5 6import pytest 7import six 8from pytz import utc 9 10from apscheduler.events import ( 11 EVENT_SCHEDULER_STARTED, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, 12 EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED, 13 EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED, 14 EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_PAUSED, 15 EVENT_SCHEDULER_RESUMED, SchedulerEvent) 16from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError 17from apscheduler.executors.debug import DebugExecutor 18from apscheduler.job import Job 19from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError 20from apscheduler.jobstores.memory import MemoryJobStore 21from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError 22from apscheduler.schedulers.base import BaseScheduler, STATE_RUNNING, STATE_STOPPED 23from apscheduler.triggers.base import BaseTrigger 24from apscheduler.util import undefined 25 26try: 27 from StringIO import StringIO 28except ImportError: 29 from io import StringIO 30 31try: 32 from unittest.mock import MagicMock, patch 33except ImportError: 34 from mock import MagicMock, patch 35 36 37class DummyScheduler(BaseScheduler): 38 def __init__(self, *args, **kwargs): 39 super(DummyScheduler, self).__init__(*args, **kwargs) 40 self.wakeup = MagicMock() 41 42 def shutdown(self, wait=True): 43 super(DummyScheduler, self).shutdown(wait) 44 45 def wakeup(self): 46 pass 47 48 49class DummyTrigger(BaseTrigger): 50 def __init__(self, **args): 51 self.args = args 52 53 def get_next_fire_time(self, previous_fire_time, now): 54 pass 55 56 57class DummyExecutor(BaseExecutor): 58 def __init__(self, **args): 59 super(DummyExecutor, self).__init__() 60 self.args = args 61 self.start = MagicMock() 62 self.shutdown = MagicMock() 63 self.submit_job = MagicMock() 64 65 def _do_submit_job(self, job, run_times): 66 pass 67 68 69class DummyJobStore(BaseJobStore): 70 def __init__(self, **args): 71 super(DummyJobStore, self).__init__() 72 self.args = args 73 self.start = MagicMock() 74 self.shutdown = MagicMock() 75 76 def get_due_jobs(self, now): 77 pass 78 79 def lookup_job(self, job_id): 80 pass 81 82 def remove_job(self, job_id): 83 pass 84 85 def remove_all_jobs(self): 86 pass 87 88 def get_next_run_time(self): 89 pass 90 91 def get_all_jobs(self): 92 pass 93 94 def add_job(self, job): 95 pass 96 97 def update_job(self, job): 98 pass 99 100 101class TestBaseScheduler(object): 102 @pytest.fixture 103 def scheduler(self, timezone): 104 return DummyScheduler() 105 106 @pytest.fixture 107 def scheduler_events(self, request, scheduler): 108 events = [] 109 mask = getattr(request, 'param', EVENT_ALL ^ EVENT_SCHEDULER_STARTED) 110 scheduler.add_listener(events.append, mask) 111 return events 112 113 def test_constructor(self): 114 with patch('%s.DummyScheduler.configure' % __name__) as configure: 115 gconfig = {'apscheduler.foo': 'bar', 'apscheduler.x': 'y'} 116 options = {'bar': 'baz', 'xyz': 123} 117 DummyScheduler(gconfig, **options) 118 119 configure.assert_called_once_with(gconfig, **options) 120 121 @pytest.mark.parametrize('gconfig', [ 122 { 123 'apscheduler.timezone': 'UTC', 124 'apscheduler.job_defaults.misfire_grace_time': '5', 125 'apscheduler.job_defaults.coalesce': 'false', 126 'apscheduler.job_defaults.max_instances': '9', 127 'apscheduler.executors.default.class': '%s:DummyExecutor' % __name__, 128 'apscheduler.executors.default.arg1': '3', 129 'apscheduler.executors.default.arg2': 'a', 130 'apscheduler.executors.alter.class': '%s:DummyExecutor' % __name__, 131 'apscheduler.executors.alter.arg': 'true', 132 'apscheduler.jobstores.default.class': '%s:DummyJobStore' % __name__, 133 'apscheduler.jobstores.default.arg1': '3', 134 'apscheduler.jobstores.default.arg2': 'a', 135 'apscheduler.jobstores.bar.class': '%s:DummyJobStore' % __name__, 136 'apscheduler.jobstores.bar.arg': 'false', 137 }, 138 { 139 'apscheduler.timezone': 'UTC', 140 'apscheduler.job_defaults': { 141 'misfire_grace_time': '5', 142 'coalesce': 'false', 143 'max_instances': '9', 144 }, 145 'apscheduler.executors': { 146 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, 147 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} 148 }, 149 'apscheduler.jobstores': { 150 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, 151 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} 152 } 153 } 154 ], ids=['ini-style', 'yaml-style']) 155 def test_configure(self, scheduler, gconfig): 156 scheduler._configure = MagicMock() 157 scheduler.configure(gconfig, timezone='Other timezone') 158 159 scheduler._configure.assert_called_once_with({ 160 'timezone': 'Other timezone', 161 'job_defaults': { 162 'misfire_grace_time': '5', 163 'coalesce': 'false', 164 'max_instances': '9', 165 }, 166 'executors': { 167 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, 168 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} 169 }, 170 'jobstores': { 171 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, 172 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} 173 } 174 }) 175 176 @pytest.mark.parametrize('method', [ 177 BaseScheduler.configure, 178 BaseScheduler.start 179 ]) 180 def test_scheduler_already_running(self, method, scheduler): 181 """ 182 Test that SchedulerAlreadyRunningError is raised when certain methods are called before 183 the scheduler has been started. 184 185 """ 186 scheduler.start(paused=True) 187 pytest.raises(SchedulerAlreadyRunningError, method, scheduler) 188 189 @pytest.mark.parametrize('method', [ 190 BaseScheduler.pause, 191 BaseScheduler.resume, 192 BaseScheduler.shutdown 193 ], ids=['pause', 'resume', 'shutdown']) 194 def test_scheduler_not_running(self, scheduler, method): 195 """ 196 Test that the SchedulerNotRunningError is raised when certain methods are called before 197 the scheduler has been started. 198 199 """ 200 pytest.raises(SchedulerNotRunningError, method, scheduler) 201 202 def test_start(self, scheduler, create_job): 203 scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)} 204 scheduler._jobstores = {'store1': MagicMock(BaseJobStore), 205 'store2': MagicMock(BaseJobStore)} 206 job = create_job(func=lambda: None) 207 scheduler._pending_jobs = [(job, 'store1', False)] 208 scheduler._real_add_job = MagicMock() 209 scheduler._dispatch_event = MagicMock() 210 scheduler.start() 211 212 scheduler._executors['exec1'].start.assert_called_once_with(scheduler, 'exec1') 213 scheduler._executors['exec2'].start.assert_called_once_with(scheduler, 'exec2') 214 scheduler._jobstores['store1'].start.assert_called_once_with(scheduler, 'store1') 215 scheduler._jobstores['store2'].start.assert_called_once_with(scheduler, 'store2') 216 assert len(scheduler._executors) == 3 217 assert len(scheduler._jobstores) == 3 218 assert 'default' in scheduler._executors 219 assert 'default' in scheduler._jobstores 220 221 scheduler._real_add_job.assert_called_once_with(job, 'store1', False) 222 assert scheduler._pending_jobs == [] 223 224 assert scheduler._dispatch_event.call_count == 3 225 event = scheduler._dispatch_event.call_args_list[0][0][0] 226 assert event.code == EVENT_EXECUTOR_ADDED 227 assert event.alias == 'default' 228 event = scheduler._dispatch_event.call_args_list[1][0][0] 229 assert event.code == EVENT_JOBSTORE_ADDED 230 assert event.alias == 'default' 231 event = scheduler._dispatch_event.call_args_list[2][0][0] 232 assert event.code == EVENT_SCHEDULER_STARTED 233 234 assert scheduler.state == STATE_RUNNING 235 236 @pytest.mark.parametrize('wait', [True, False], ids=['wait', 'nowait']) 237 def test_shutdown(self, scheduler, scheduler_events, wait): 238 executor = DummyExecutor() 239 jobstore = DummyJobStore() 240 scheduler.add_executor(executor) 241 scheduler.add_jobstore(jobstore) 242 scheduler.start(paused=True) 243 del scheduler_events[:] 244 scheduler.shutdown(wait) 245 246 assert scheduler.state == STATE_STOPPED 247 assert len(scheduler_events) == 1 248 assert scheduler_events[0].code == EVENT_SCHEDULER_SHUTDOWN 249 250 executor.shutdown.assert_called_once_with(wait) 251 jobstore.shutdown.assert_called_once_with() 252 253 def test_pause_resume(self, scheduler, scheduler_events): 254 scheduler.start() 255 del scheduler_events[:] 256 scheduler.wakeup.reset_mock() 257 258 scheduler.pause() 259 260 assert len(scheduler_events) == 1 261 assert scheduler_events[0].code == EVENT_SCHEDULER_PAUSED 262 assert not scheduler.wakeup.called 263 264 scheduler.resume() 265 assert len(scheduler_events) == 2 266 assert scheduler_events[1].code == EVENT_SCHEDULER_RESUMED 267 assert scheduler.wakeup.called 268 269 @pytest.mark.parametrize('start_scheduler', [True, False]) 270 def test_running(self, scheduler, start_scheduler): 271 if start_scheduler: 272 scheduler.start() 273 274 assert scheduler.running is start_scheduler 275 276 @pytest.mark.parametrize('start_scheduler', [True, False]) 277 def test_add_remove_executor(self, scheduler, scheduler_events, start_scheduler): 278 if start_scheduler: 279 scheduler.start(paused=True) 280 281 del scheduler_events[:] 282 executor = DummyExecutor() 283 scheduler.add_executor(executor, 'exec1') 284 285 assert len(scheduler_events) == 1 286 assert scheduler_events[0].code == EVENT_EXECUTOR_ADDED 287 assert scheduler_events[0].alias == 'exec1' 288 if start_scheduler: 289 executor.start.assert_called_once_with(scheduler, 'exec1') 290 else: 291 assert not executor.start.called 292 293 scheduler.remove_executor('exec1') 294 assert len(scheduler_events) == 2 295 assert scheduler_events[1].code == EVENT_EXECUTOR_REMOVED 296 assert scheduler_events[1].alias == 'exec1' 297 assert executor.shutdown.called 298 299 def test_add_executor_already_exists(self, scheduler): 300 executor = DummyExecutor() 301 scheduler.add_executor(executor) 302 exc = pytest.raises(ValueError, scheduler.add_executor, executor) 303 assert str(exc.value) == 'This scheduler already has an executor by the alias of "default"' 304 305 def test_remove_executor_nonexistent(self, scheduler): 306 pytest.raises(KeyError, scheduler.remove_executor, 'foo') 307 308 @pytest.mark.parametrize('start_scheduler', [True, False]) 309 def test_add_jobstore(self, scheduler, scheduler_events, start_scheduler): 310 """ 311 Test that the proper event is dispatched when a job store is added and the scheduler's 312 wake() method is called if the scheduler is running. 313 314 """ 315 if start_scheduler: 316 scheduler.start() 317 318 del scheduler_events[:] 319 jobstore = DummyJobStore() 320 scheduler.add_jobstore(jobstore, 'store1') 321 322 assert len(scheduler_events) == 1 323 assert scheduler_events[0].code == EVENT_JOBSTORE_ADDED 324 assert scheduler_events[0].alias == 'store1' 325 326 if start_scheduler: 327 assert scheduler.wakeup.called 328 jobstore.start.assert_called_once_with(scheduler, 'store1') 329 else: 330 assert not jobstore.start.called 331 332 def test_add_jobstore_already_exists(self, scheduler): 333 """ 334 Test that ValueError is raised when a job store is added with an alias that already exists. 335 336 """ 337 jobstore = MemoryJobStore() 338 scheduler.add_jobstore(jobstore) 339 exc = pytest.raises(ValueError, scheduler.add_jobstore, jobstore) 340 assert str(exc.value) == 'This scheduler already has a job store by the alias of "default"' 341 342 def test_remove_jobstore(self, scheduler, scheduler_events): 343 scheduler.add_jobstore(MemoryJobStore(), 'foo') 344 scheduler.remove_jobstore('foo') 345 346 assert len(scheduler_events) == 2 347 assert scheduler_events[1].code == EVENT_JOBSTORE_REMOVED 348 assert scheduler_events[1].alias == 'foo' 349 350 def test_remove_jobstore_nonexistent(self, scheduler): 351 pytest.raises(KeyError, scheduler.remove_jobstore, 'foo') 352 353 def test_add_remove_listener(self, scheduler): 354 """Test that event dispatch works but removed listeners aren't called.""" 355 events = [] 356 scheduler.add_listener(events.append, EVENT_EXECUTOR_ADDED) 357 scheduler.add_executor(DummyExecutor(), 'exec1') 358 scheduler.remove_listener(events.append) 359 scheduler.add_executor(DummyExecutor(), 'exec2') 360 assert len(events) == 1 361 362 def test_add_job_return_value(self, scheduler, timezone): 363 """Test that when a job is added to a stopped scheduler, a Job instance is returned.""" 364 job = scheduler.add_job(lambda x, y: None, 'date', [1], {'y': 2}, 'my-id', 'dummy', 365 next_run_time=datetime(2014, 5, 23, 10), 366 run_date='2014-06-01 08:41:00') 367 368 assert isinstance(job, Job) 369 assert job.id == 'my-id' 370 assert not hasattr(job, 'misfire_grace_time') 371 assert not hasattr(job, 'coalesce') 372 assert not hasattr(job, 'max_instances') 373 assert job.next_run_time.tzinfo.zone == timezone.zone 374 375 def test_add_job_pending(self, scheduler, scheduler_events): 376 """ 377 Test that when a job is added to a stopped scheduler, it is not added to a job store until 378 the scheduler is started and that the event is dispatched when that happens. 379 380 """ 381 scheduler.configure(job_defaults={ 382 'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6 383 }) 384 job = scheduler.add_job(lambda: None, 'interval', hours=1) 385 assert not scheduler_events 386 387 scheduler.start(paused=True) 388 389 assert len(scheduler_events) == 3 390 assert scheduler_events[2].code == EVENT_JOB_ADDED 391 assert scheduler_events[2].job_id is job.id 392 393 # Check that the undefined values were replaced with scheduler's job defaults 394 assert job.misfire_grace_time == 3 395 assert not job.coalesce 396 assert job.max_instances == 6 397 398 def test_add_job_id_conflict(self, scheduler): 399 """ 400 Test that if a job is added with an already existing id, ConflictingIdError is raised. 401 402 """ 403 scheduler.start(paused=True) 404 scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) 405 pytest.raises(ConflictingIdError, scheduler.add_job, lambda: None, 'interval', 406 id='testjob', seconds=1) 407 408 def test_add_job_replace(self, scheduler): 409 """Test that with replace_existing=True, a new job replaces another with the same id.""" 410 scheduler.start(paused=True) 411 scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) 412 scheduler.add_job(lambda: None, 'cron', id='testjob', name='replacement', 413 replace_existing=True) 414 jobs = scheduler.get_jobs() 415 assert len(jobs) == 1 416 assert jobs[0].name == 'replacement' 417 418 def test_scheduled_job(self, scheduler): 419 def func(x, y): 420 pass 421 422 scheduler.add_job = MagicMock() 423 decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id', 424 'dummy', run_date='2014-06-01 08:41:00') 425 decorator(func) 426 427 scheduler.add_job.assert_called_once_with( 428 func, 'date', [1], {'y': 2}, 'my-id', 'dummy', undefined, undefined, undefined, 429 undefined, 'default', 'default', True, run_date='2014-06-01 08:41:00') 430 431 @pytest.mark.parametrize('pending', [True, False], ids=['pending job', 'scheduled job']) 432 def test_modify_job(self, scheduler, pending, timezone): 433 job = MagicMock() 434 scheduler._dispatch_event = MagicMock() 435 scheduler._lookup_job = MagicMock(return_value=(job, None if pending else 'default')) 436 if not pending: 437 jobstore = MagicMock() 438 scheduler._lookup_jobstore = lambda alias: jobstore if alias == 'default' else None 439 scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2, 440 next_run_time=datetime(2014, 10, 17)) 441 442 job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2, 443 next_run_time=datetime(2014, 10, 17)) 444 if not pending: 445 jobstore.update_job.assert_called_once_with(job) 446 447 assert scheduler._dispatch_event.call_count == 1 448 event = scheduler._dispatch_event.call_args[0][0] 449 assert event.code == EVENT_JOB_MODIFIED 450 assert event.jobstore == (None if pending else 'default') 451 452 def test_reschedule_job(self, scheduler): 453 scheduler.modify_job = MagicMock() 454 trigger = MagicMock(get_next_fire_time=lambda previous, now: 1) 455 scheduler._create_trigger = MagicMock(return_value=trigger) 456 scheduler.reschedule_job('my-id', 'jobstore', 'date', run_date='2014-06-01 08:41:00') 457 458 assert scheduler.modify_job.call_count == 1 459 assert scheduler.modify_job.call_args[0] == ('my-id', 'jobstore') 460 assert scheduler.modify_job.call_args[1] == {'trigger': trigger, 'next_run_time': 1} 461 462 def test_pause_job(self, scheduler): 463 scheduler.modify_job = MagicMock() 464 scheduler.pause_job('job_id', 'jobstore') 465 466 scheduler.modify_job.assert_called_once_with('job_id', 'jobstore', next_run_time=None) 467 468 @pytest.mark.parametrize('dead_job', [True, False], ids=['dead job', 'live job']) 469 def test_resume_job(self, scheduler, freeze_time, dead_job): 470 next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1) 471 trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time) 472 returned_job = MagicMock(Job, id='foo', trigger=trigger) 473 scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar')) 474 scheduler.modify_job = MagicMock() 475 scheduler.remove_job = MagicMock() 476 scheduler.resume_job('foo') 477 478 if dead_job: 479 scheduler.remove_job.assert_called_once_with('foo', 'bar') 480 else: 481 scheduler.modify_job.assert_called_once_with('foo', 'bar', 482 next_run_time=next_fire_time) 483 484 @pytest.mark.parametrize('scheduler_started', [True, False], ids=['running', 'stopped']) 485 @pytest.mark.parametrize('jobstore', [None, 'other'], 486 ids=['all jobstores', 'specific jobstore']) 487 def test_get_jobs(self, scheduler, scheduler_started, jobstore): 488 scheduler.add_jobstore(MemoryJobStore(), 'other') 489 scheduler.add_job(lambda: None, 'interval', seconds=1, id='job1') 490 scheduler.add_job(lambda: None, 'interval', seconds=1, id='job2', jobstore='other') 491 if scheduler_started: 492 scheduler.start(paused=True) 493 494 expected_job_ids = {'job2'} 495 if jobstore is None: 496 expected_job_ids.add('job1') 497 498 job_ids = {job.id for job in scheduler.get_jobs(jobstore)} 499 assert job_ids == expected_job_ids 500 501 @pytest.mark.parametrize('jobstore', [None, 'bar'], ids=['any jobstore', 'specific jobstore']) 502 def test_get_job(self, scheduler, jobstore): 503 returned_job = object() 504 scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar')) 505 job = scheduler.get_job('foo', jobstore) 506 507 assert job is returned_job 508 509 def test_get_job_nonexistent_job(self, scheduler): 510 scheduler._lookup_job = MagicMock(side_effect=JobLookupError('foo')) 511 assert scheduler.get_job('foo') is None 512 513 def test_get_job_nonexistent_jobstore(self, scheduler): 514 assert scheduler.get_job('foo', 'bar') is None 515 516 @pytest.mark.parametrize('start_scheduler', [True, False]) 517 @pytest.mark.parametrize('jobstore', [None, 'other'], 518 ids=['any jobstore', 'specific jobstore']) 519 def test_remove_job(self, scheduler, scheduler_events, start_scheduler, jobstore): 520 scheduler.add_jobstore(MemoryJobStore(), 'other') 521 scheduler.add_job(lambda: None, id='job1') 522 if start_scheduler: 523 scheduler.start(paused=True) 524 525 del scheduler_events[:] 526 if jobstore: 527 pytest.raises(JobLookupError, scheduler.remove_job, 'job1', jobstore) 528 assert len(scheduler.get_jobs()) == 1 529 assert len(scheduler_events) == 0 530 else: 531 scheduler.remove_job('job1', jobstore) 532 assert len(scheduler.get_jobs()) == 0 533 assert len(scheduler_events) == 1 534 assert scheduler_events[0].code == EVENT_JOB_REMOVED 535 536 def test_remove_nonexistent_job(self, scheduler): 537 pytest.raises(JobLookupError, scheduler.remove_job, 'foo') 538 539 @pytest.mark.parametrize('start_scheduler', [True, False]) 540 @pytest.mark.parametrize('jobstore', [None, 'other'], ids=['all', 'single jobstore']) 541 def test_remove_all_jobs(self, scheduler, start_scheduler, scheduler_events, jobstore): 542 """ 543 Test that remove_all_jobs() removes all jobs from all attached job stores, plus any 544 pending jobs. 545 546 """ 547 scheduler.add_jobstore(MemoryJobStore(), 'other') 548 scheduler.add_job(lambda: None, id='job1') 549 scheduler.add_job(lambda: None, id='job2') 550 scheduler.add_job(lambda: None, id='job3', jobstore='other') 551 if start_scheduler: 552 scheduler.start(paused=True) 553 554 del scheduler_events[:] 555 scheduler.remove_all_jobs(jobstore) 556 jobs = scheduler.get_jobs() 557 558 assert len(jobs) == (2 if jobstore else 0) 559 assert len(scheduler_events) == 1 560 assert scheduler_events[0].code == EVENT_ALL_JOBS_REMOVED 561 assert scheduler_events[0].alias == jobstore 562 563 @pytest.mark.parametrize('start_scheduler', [True, False]) 564 @pytest.mark.parametrize('jobstore', [None, 'other'], 565 ids=['all jobstores', 'specific jobstore']) 566 def test_print_jobs(self, scheduler, start_scheduler, jobstore): 567 scheduler.add_jobstore(MemoryJobStore(), 'other') 568 if start_scheduler: 569 scheduler.start(paused=True) 570 571 scheduler.add_job(lambda: None, 'date', run_date='2099-09-09', id='job1', 572 name='test job 1') 573 scheduler.add_job(lambda: None, 'date', run_date='2099-08-08', id='job2', 574 name='test job 2', jobstore='other') 575 576 outfile = StringIO() 577 scheduler.print_jobs(jobstore, outfile) 578 579 if jobstore and not start_scheduler: 580 assert outfile.getvalue() == """\ 581Pending jobs: 582 test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending) 583""" 584 elif jobstore and start_scheduler: 585 assert outfile.getvalue() == """\ 586Jobstore other: 587 test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET) 588""" 589 elif not jobstore and not start_scheduler: 590 assert outfile.getvalue() == """\ 591Pending jobs: 592 test job 1 (trigger: date[2099-09-09 00:00:00 CET], pending) 593 test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending) 594""" 595 else: 596 assert outfile.getvalue() == """\ 597Jobstore default: 598 test job 1 (trigger: date[2099-09-09 00:00:00 CET], next run at: 2099-09-09 00:00:00 CET) 599Jobstore other: 600 test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET) 601""" 602 603 @pytest.mark.parametrize('config', [ 604 { 605 'timezone': 'UTC', 606 'job_defaults': { 607 'misfire_grace_time': '5', 608 'coalesce': 'false', 609 'max_instances': '9', 610 }, 611 'executors': { 612 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, 613 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} 614 }, 615 'jobstores': { 616 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, 617 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} 618 } 619 }, 620 { 621 'timezone': utc, 622 'job_defaults': { 623 'misfire_grace_time': 5, 624 'coalesce': False, 625 'max_instances': 9, 626 }, 627 'executors': { 628 'default': DummyExecutor(arg1='3', arg2='a'), 629 'alter': DummyExecutor(arg='true') 630 }, 631 'jobstores': { 632 'default': DummyJobStore(arg1='3', arg2='a'), 633 'bar': DummyJobStore(arg='false') 634 } 635 } 636 ], ids=['references', 'instances']) 637 def test_configure_private(self, scheduler, config): 638 scheduler._configure(config) 639 640 assert scheduler.timezone is utc 641 assert scheduler._job_defaults == { 642 'misfire_grace_time': 5, 643 'coalesce': False, 644 'max_instances': 9 645 } 646 assert set(six.iterkeys(scheduler._executors)) == set(['default', 'alter']) 647 assert scheduler._executors['default'].args == {'arg1': '3', 'arg2': 'a'} 648 assert scheduler._executors['alter'].args == {'arg': 'true'} 649 assert set(six.iterkeys(scheduler._jobstores)) == set(['default', 'bar']) 650 assert scheduler._jobstores['default'].args == {'arg1': '3', 'arg2': 'a'} 651 assert scheduler._jobstores['bar'].args == {'arg': 'false'} 652 653 def test_configure_private_invalid_executor(self, scheduler): 654 exc = pytest.raises(TypeError, scheduler._configure, {'executors': {'default': 6}}) 655 assert str(exc.value) == ("Expected executor instance or dict for executors['default'], " 656 "got int instead") 657 658 def test_configure_private_invalid_jobstore(self, scheduler): 659 exc = pytest.raises(TypeError, scheduler._configure, {'jobstores': {'default': 6}}) 660 assert str(exc.value) == ("Expected job store instance or dict for jobstores['default'], " 661 "got int instead") 662 663 def test_create_default_executor(self, scheduler): 664 executor = scheduler._create_default_executor() 665 assert isinstance(executor, BaseExecutor) 666 667 def test_create_default_jobstore(self, scheduler): 668 store = scheduler._create_default_jobstore() 669 assert isinstance(store, BaseJobStore) 670 671 def test_lookup_executor(self, scheduler): 672 executor = object() 673 scheduler._executors = {'executor': executor} 674 assert scheduler._lookup_executor('executor') is executor 675 676 def test_lookup_executor_nonexistent(self, scheduler): 677 pytest.raises(KeyError, scheduler._lookup_executor, 'executor') 678 679 def test_lookup_jobstore(self, scheduler): 680 store = object() 681 scheduler._jobstores = {'store': store} 682 assert scheduler._lookup_jobstore('store') is store 683 684 def test_lookup_jobstore_nonexistent(self, scheduler): 685 pytest.raises(KeyError, scheduler._lookup_jobstore, 'store') 686 687 def test_dispatch_event(self, scheduler): 688 event = SchedulerEvent(1) 689 scheduler._listeners = [(MagicMock(), 2), (MagicMock(side_effect=Exception), 1), 690 (MagicMock(), 1)] 691 scheduler._dispatch_event(event) 692 693 assert not scheduler._listeners[0][0].called 694 scheduler._listeners[1][0].assert_called_once_with(event) 695 696 @pytest.mark.parametrize('load_plugin', [True, False], ids=['load plugin', 'plugin loaded']) 697 def test_create_trigger(self, scheduler, load_plugin): 698 """Tests that creating a trigger with an already loaded plugin works.""" 699 700 scheduler._trigger_plugins = {} 701 scheduler._trigger_classes = {} 702 if load_plugin: 703 scheduler._trigger_plugins['dummy'] = MagicMock( 704 load=MagicMock(return_value=DummyTrigger)) 705 else: 706 scheduler._trigger_classes['dummy'] = DummyTrigger 707 708 result = scheduler._create_trigger('dummy', {'a': 1, 'b': 'x'}) 709 710 assert isinstance(result, DummyTrigger) 711 assert result.args == {'a': 1, 'b': 'x', 'timezone': scheduler.timezone} 712 713 def test_create_trigger_instance(self, scheduler): 714 """Tests that passing a trigger instance will return the instance as-is.""" 715 trigger_instance = DummyTrigger() 716 717 assert scheduler._create_trigger(trigger_instance, {}) is trigger_instance 718 719 def test_create_trigger_default_type(self, scheduler): 720 """Tests that passing None as the trigger will create a "date" trigger instance.""" 721 scheduler._trigger_classes = {'date': DummyTrigger} 722 result = scheduler._create_trigger(None, {'a': 1}) 723 724 assert isinstance(result, DummyTrigger) 725 assert result.args == {'a': 1, 'timezone': scheduler.timezone} 726 727 def test_create_trigger_bad_trigger_type(self, scheduler): 728 exc = pytest.raises(TypeError, scheduler._create_trigger, 1, {}) 729 assert str(exc.value) == 'Expected a trigger instance or string, got int instead' 730 731 def test_create_trigger_bad_plugin_type(self, scheduler): 732 mock_plugin = MagicMock() 733 mock_plugin.load.configure_mock(return_value=object) 734 scheduler._trigger_classes = {} 735 scheduler._trigger_plugins = {'dummy': mock_plugin} 736 exc = pytest.raises(TypeError, scheduler._create_trigger, 'dummy', {}) 737 assert str(exc.value) == 'The trigger entry point does not point to a trigger class' 738 739 def test_create_trigger_nonexisting_plugin(self, scheduler): 740 exc = pytest.raises(LookupError, scheduler._create_trigger, 'dummy', {}) 741 assert str(exc.value) == 'No trigger by the name "dummy" was found' 742 743 def test_create_lock(self, scheduler): 744 lock = scheduler._create_lock() 745 assert hasattr(lock, '__enter__') 746 747 def test_process_jobs_empty(self, scheduler): 748 assert scheduler._process_jobs() is None 749 750 def test_job_submitted_event(self, scheduler, freeze_time): 751 events = [] 752 scheduler.add_job(lambda: None, run_date=freeze_time.get()) 753 scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED) 754 scheduler.start() 755 scheduler._process_jobs() 756 757 assert len(events) == 1 758 assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] 759 760 @pytest.mark.parametrize('scheduler_events', [EVENT_JOB_MAX_INSTANCES], 761 indirect=['scheduler_events']) 762 def test_job_max_instances_event(self, scheduler, scheduler_events, freeze_time): 763 class MaxedOutExecutor(DebugExecutor): 764 def submit_job(self, job, run_times): 765 raise MaxInstancesReachedError(job) 766 767 executor = MaxedOutExecutor() 768 scheduler.add_executor(executor, 'maxed') 769 scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed') 770 scheduler.start() 771 scheduler._process_jobs() 772 773 assert len(scheduler_events) == 1 774 assert scheduler_events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] 775 776 def test_serialize_scheduler(self, scheduler): 777 pytest.raises(TypeError, pickle.dumps, scheduler).match('Schedulers cannot be serialized') 778 779 780class TestProcessJobs(object): 781 @pytest.fixture 782 def job(self): 783 job = MagicMock(Job, id=999, executor='default') 784 job.trigger = MagicMock(get_next_fire_time=MagicMock(return_value=None)) 785 job. __str__ = lambda x: 'job 999' 786 return job 787 788 @pytest.fixture 789 def scheduler(self): 790 scheduler = DummyScheduler() 791 scheduler.start() 792 return scheduler 793 794 @pytest.fixture 795 def jobstore(self, scheduler, job): 796 jobstore = MagicMock(BaseJobStore, get_due_jobs=MagicMock(return_value=[job]), 797 get_next_run_time=MagicMock(return_value=None)) 798 scheduler._jobstores['default'] = jobstore 799 return jobstore 800 801 @pytest.fixture 802 def executor(self, scheduler): 803 executor = MagicMock(BaseExecutor) 804 scheduler._executors['default'] = executor 805 return executor 806 807 def test_nonexistent_executor(self, scheduler, jobstore, caplog): 808 """ 809 Test that an error is logged and the job is removed from its job store if its executor is 810 not found. 811 812 """ 813 caplog.setLevel(logging.ERROR) 814 scheduler.remove_executor('default') 815 assert scheduler._process_jobs() is None 816 jobstore.remove_job.assert_called_once_with(999) 817 assert len(caplog.records()) == 2 818 assert caplog.records()[1].message == \ 819 'Executor lookup ("default") failed for job "job 999" -- removing it from the job ' \ 820 'store' 821 822 def test_max_instances_reached(self, scheduler, job, jobstore, executor, caplog): 823 """Tests that a warning is logged when the maximum instances of a job is reached.""" 824 caplog.setLevel(logging.WARNING) 825 executor.submit_job = MagicMock(side_effect=MaxInstancesReachedError(job)) 826 827 assert scheduler._process_jobs() is None 828 assert len(caplog.records()) == 2 829 assert caplog.records()[1].message == \ 830 'Execution of job "job 999" skipped: maximum number of running instances reached (1)' 831 832 def test_executor_error(self, scheduler, jobstore, executor, caplog): 833 """Tests that if any exception is raised in executor.submit(), it is logged.""" 834 caplog.setLevel(logging.ERROR) 835 executor.submit_job = MagicMock(side_effect=Exception('test message')) 836 837 assert scheduler._process_jobs() is None 838 assert len(caplog.records()) == 2 839 assert 'test message' in caplog.records()[1].exc_text 840 assert 'Error submitting job "job 999" to executor "default"' in caplog.records()[1].message 841 842 def test_job_update(self, scheduler, job, jobstore, freeze_time): 843 """ 844 Tests that the job is updated in its job store with the next run time from the trigger. 845 846 """ 847 next_run_time = freeze_time.current + timedelta(seconds=6) 848 job.trigger.get_next_fire_time = MagicMock(return_value=next_run_time) 849 assert scheduler._process_jobs() is None 850 job._modify.assert_called_once_with(next_run_time=next_run_time) 851 jobstore.update_job.assert_called_once_with(job) 852 853 def test_wait_time(self, scheduler, freeze_time): 854 """ 855 Tests that the earliest next run time from all job stores is returned (ignoring Nones). 856 857 """ 858 scheduler._jobstores = { 859 'default': MagicMock(get_next_run_time=MagicMock( 860 return_value=freeze_time.current + timedelta(seconds=8))), 861 'alter': MagicMock(get_next_run_time=MagicMock(return_value=None)), 862 'another': MagicMock(get_next_run_time=MagicMock( 863 return_value=freeze_time.current + timedelta(seconds=5))), 864 'more': MagicMock(get_next_run_time=MagicMock( 865 return_value=freeze_time.current + timedelta(seconds=6))), 866 } 867 868 assert scheduler._process_jobs() == 5 869 870 871class SchedulerImplementationTestBase(object): 872 @pytest.fixture(autouse=True) 873 def executor(self, scheduler): 874 scheduler.add_executor(DebugExecutor()) 875 876 @pytest.fixture 877 def start_scheduler(self, request, scheduler): 878 yield scheduler.start 879 if scheduler.running: 880 scheduler.shutdown() 881 882 @pytest.fixture 883 def eventqueue(self, scheduler): 884 from six.moves.queue import Queue 885 events = Queue() 886 scheduler.add_listener(events.put) 887 return events 888 889 def wait_event(self, queue): 890 return queue.get(True, 1) 891 892 def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler): 893 """Tests that pending jobs are added (and if due, executed) when the scheduler starts.""" 894 freeze_time.set_increment(timedelta(seconds=0.2)) 895 scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], run_date=freeze_time.next()) 896 start_scheduler() 897 898 assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED 899 assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED 900 assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED 901 event = self.wait_event(eventqueue) 902 assert event.code == EVENT_JOB_EXECUTED 903 assert event.retval == 3 904 assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED 905 906 def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler): 907 """Tests that adding a job causes it to be executed after the specified delay.""" 908 freeze_time.set_increment(timedelta(seconds=0.2)) 909 start_scheduler() 910 assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED 911 assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED 912 913 scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], 914 run_date=freeze_time.next() + freeze_time.increment * 2) 915 assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED 916 event = self.wait_event(eventqueue) 917 assert event.code == EVENT_JOB_EXECUTED 918 assert event.retval == 3 919 assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED 920 921 def test_shutdown(self, scheduler, eventqueue, start_scheduler): 922 """Tests that shutting down the scheduler emits the proper event.""" 923 start_scheduler() 924 assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED 925 assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED 926 927 scheduler.shutdown() 928 assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN 929 930 931class TestBlockingScheduler(SchedulerImplementationTestBase): 932 @pytest.fixture 933 def scheduler(self): 934 from apscheduler.schedulers.blocking import BlockingScheduler 935 return BlockingScheduler() 936 937 @pytest.fixture 938 def start_scheduler(self, request, scheduler): 939 thread = Thread(target=scheduler.start) 940 yield thread.start 941 942 if scheduler.running: 943 scheduler.shutdown() 944 thread.join() 945 946 947class TestBackgroundScheduler(SchedulerImplementationTestBase): 948 @pytest.fixture 949 def scheduler(self): 950 from apscheduler.schedulers.background import BackgroundScheduler 951 return BackgroundScheduler() 952 953 954class TestAsyncIOScheduler(SchedulerImplementationTestBase): 955 @pytest.fixture 956 def event_loop(self): 957 asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') 958 return asyncio.asyncio.new_event_loop() 959 960 @pytest.fixture 961 def scheduler(self, event_loop): 962 asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') 963 return asyncio.AsyncIOScheduler(event_loop=event_loop) 964 965 @pytest.fixture 966 def start_scheduler(self, request, event_loop, scheduler): 967 event_loop.call_soon_threadsafe(scheduler.start) 968 thread = Thread(target=event_loop.run_forever) 969 yield thread.start 970 971 if scheduler.running: 972 event_loop.call_soon_threadsafe(scheduler.shutdown) 973 event_loop.call_soon_threadsafe(event_loop.stop) 974 thread.join() 975 976 977class TestGeventScheduler(SchedulerImplementationTestBase): 978 @pytest.fixture 979 def scheduler(self): 980 gevent = pytest.importorskip('apscheduler.schedulers.gevent') 981 return gevent.GeventScheduler() 982 983 @pytest.fixture 984 def calc_event(self): 985 from gevent.event import Event 986 return Event() 987 988 @pytest.fixture 989 def eventqueue(self, scheduler): 990 from gevent.queue import Queue 991 events = Queue() 992 scheduler.add_listener(events.put) 993 return events 994 995 996class TestTornadoScheduler(SchedulerImplementationTestBase): 997 @pytest.fixture 998 def io_loop(self): 999 ioloop = pytest.importorskip('tornado.ioloop') 1000 return ioloop.IOLoop() 1001 1002 @pytest.fixture 1003 def scheduler(self, io_loop): 1004 tornado = pytest.importorskip('apscheduler.schedulers.tornado') 1005 return tornado.TornadoScheduler(io_loop=io_loop) 1006 1007 @pytest.fixture 1008 def start_scheduler(self, request, io_loop, scheduler): 1009 io_loop.add_callback(scheduler.start) 1010 thread = Thread(target=io_loop.start) 1011 yield thread.start 1012 1013 if scheduler.running: 1014 io_loop.add_callback(scheduler.shutdown) 1015 io_loop.add_callback(io_loop.stop) 1016 thread.join() 1017 1018 1019class TestTwistedScheduler(SchedulerImplementationTestBase): 1020 @pytest.fixture 1021 def reactor(self): 1022 selectreactor = pytest.importorskip('twisted.internet.selectreactor') 1023 return selectreactor.SelectReactor() 1024 1025 @pytest.fixture 1026 def scheduler(self, reactor): 1027 twisted = pytest.importorskip('apscheduler.schedulers.twisted') 1028 return twisted.TwistedScheduler(reactor=reactor) 1029 1030 @pytest.fixture 1031 def start_scheduler(self, request, reactor, scheduler): 1032 reactor.callFromThread(scheduler.start) 1033 thread = Thread(target=reactor.run, args=(False,)) 1034 yield thread.start 1035 1036 if scheduler.running: 1037 reactor.callFromThread(scheduler.shutdown) 1038 reactor.callFromThread(reactor.stop) 1039 thread.join() 1040 1041 1042@pytest.mark.skip 1043class TestQtScheduler(SchedulerImplementationTestBase): 1044 @pytest.fixture(scope='class') 1045 def coreapp(self): 1046 QtCore = pytest.importorskip('PySide.QtCore') 1047 QtCore.QCoreApplication([]) 1048 1049 @pytest.fixture 1050 def scheduler(self, coreapp): 1051 qt = pytest.importorskip('apscheduler.schedulers.qt') 1052 return qt.QtScheduler() 1053 1054 def wait_event(self, queue): 1055 from PySide.QtCore import QCoreApplication 1056 1057 while queue.empty(): 1058 QCoreApplication.processEvents() 1059 return queue.get_nowait() 1060