1import logging
2import pickle
3from datetime import datetime, timedelta
4from threading import Thread
5
6import pytest
7import six
8from pytz import utc
9
10from apscheduler.events import (
11    EVENT_SCHEDULER_STARTED, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED,
12    EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED,
13    EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED,
14    EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_PAUSED,
15    EVENT_SCHEDULER_RESUMED, SchedulerEvent)
16from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
17from apscheduler.executors.debug import DebugExecutor
18from apscheduler.job import Job
19from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
20from apscheduler.jobstores.memory import MemoryJobStore
21from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
22from apscheduler.schedulers.base import BaseScheduler, STATE_RUNNING, STATE_STOPPED
23from apscheduler.triggers.base import BaseTrigger
24from apscheduler.util import undefined
25
26try:
27    from StringIO import StringIO
28except ImportError:
29    from io import StringIO
30
31try:
32    from unittest.mock import MagicMock, patch
33except ImportError:
34    from mock import MagicMock, patch
35
36
37class DummyScheduler(BaseScheduler):
38    def __init__(self, *args, **kwargs):
39        super(DummyScheduler, self).__init__(*args, **kwargs)
40        self.wakeup = MagicMock()
41
42    def shutdown(self, wait=True):
43        super(DummyScheduler, self).shutdown(wait)
44
45    def wakeup(self):
46        pass
47
48
49class DummyTrigger(BaseTrigger):
50    def __init__(self, **args):
51        self.args = args
52
53    def get_next_fire_time(self, previous_fire_time, now):
54        pass
55
56
57class DummyExecutor(BaseExecutor):
58    def __init__(self, **args):
59        super(DummyExecutor, self).__init__()
60        self.args = args
61        self.start = MagicMock()
62        self.shutdown = MagicMock()
63        self.submit_job = MagicMock()
64
65    def _do_submit_job(self, job, run_times):
66        pass
67
68
69class DummyJobStore(BaseJobStore):
70    def __init__(self, **args):
71        super(DummyJobStore, self).__init__()
72        self.args = args
73        self.start = MagicMock()
74        self.shutdown = MagicMock()
75
76    def get_due_jobs(self, now):
77        pass
78
79    def lookup_job(self, job_id):
80        pass
81
82    def remove_job(self, job_id):
83        pass
84
85    def remove_all_jobs(self):
86        pass
87
88    def get_next_run_time(self):
89        pass
90
91    def get_all_jobs(self):
92        pass
93
94    def add_job(self, job):
95        pass
96
97    def update_job(self, job):
98        pass
99
100
101class TestBaseScheduler(object):
102    @pytest.fixture
103    def scheduler(self, timezone):
104        return DummyScheduler()
105
106    @pytest.fixture
107    def scheduler_events(self, request, scheduler):
108        events = []
109        mask = getattr(request, 'param', EVENT_ALL ^ EVENT_SCHEDULER_STARTED)
110        scheduler.add_listener(events.append, mask)
111        return events
112
113    def test_constructor(self):
114        with patch('%s.DummyScheduler.configure' % __name__) as configure:
115            gconfig = {'apscheduler.foo': 'bar', 'apscheduler.x': 'y'}
116            options = {'bar': 'baz', 'xyz': 123}
117            DummyScheduler(gconfig, **options)
118
119        configure.assert_called_once_with(gconfig, **options)
120
121    @pytest.mark.parametrize('gconfig', [
122        {
123            'apscheduler.timezone': 'UTC',
124            'apscheduler.job_defaults.misfire_grace_time': '5',
125            'apscheduler.job_defaults.coalesce': 'false',
126            'apscheduler.job_defaults.max_instances': '9',
127            'apscheduler.executors.default.class': '%s:DummyExecutor' % __name__,
128            'apscheduler.executors.default.arg1': '3',
129            'apscheduler.executors.default.arg2': 'a',
130            'apscheduler.executors.alter.class': '%s:DummyExecutor' % __name__,
131            'apscheduler.executors.alter.arg': 'true',
132            'apscheduler.jobstores.default.class': '%s:DummyJobStore' % __name__,
133            'apscheduler.jobstores.default.arg1': '3',
134            'apscheduler.jobstores.default.arg2': 'a',
135            'apscheduler.jobstores.bar.class': '%s:DummyJobStore' % __name__,
136            'apscheduler.jobstores.bar.arg': 'false',
137        },
138        {
139            'apscheduler.timezone': 'UTC',
140            'apscheduler.job_defaults': {
141                'misfire_grace_time': '5',
142                'coalesce': 'false',
143                'max_instances': '9',
144            },
145            'apscheduler.executors': {
146                'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
147                'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
148            },
149            'apscheduler.jobstores': {
150                'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
151                'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
152            }
153        }
154    ], ids=['ini-style', 'yaml-style'])
155    def test_configure(self, scheduler, gconfig):
156        scheduler._configure = MagicMock()
157        scheduler.configure(gconfig, timezone='Other timezone')
158
159        scheduler._configure.assert_called_once_with({
160            'timezone': 'Other timezone',
161            'job_defaults': {
162                'misfire_grace_time': '5',
163                'coalesce': 'false',
164                'max_instances': '9',
165            },
166            'executors': {
167                'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
168                'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
169            },
170            'jobstores': {
171                'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
172                'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
173            }
174        })
175
176    @pytest.mark.parametrize('method', [
177        BaseScheduler.configure,
178        BaseScheduler.start
179    ])
180    def test_scheduler_already_running(self, method, scheduler):
181        """
182        Test that SchedulerAlreadyRunningError is raised when certain methods are called before
183        the scheduler has been started.
184
185        """
186        scheduler.start(paused=True)
187        pytest.raises(SchedulerAlreadyRunningError, method, scheduler)
188
189    @pytest.mark.parametrize('method', [
190        BaseScheduler.pause,
191        BaseScheduler.resume,
192        BaseScheduler.shutdown
193    ], ids=['pause', 'resume', 'shutdown'])
194    def test_scheduler_not_running(self, scheduler, method):
195        """
196        Test that the SchedulerNotRunningError is raised when certain methods are called before
197        the scheduler has been started.
198
199        """
200        pytest.raises(SchedulerNotRunningError, method, scheduler)
201
202    def test_start(self, scheduler, create_job):
203        scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)}
204        scheduler._jobstores = {'store1': MagicMock(BaseJobStore),
205                                'store2': MagicMock(BaseJobStore)}
206        job = create_job(func=lambda: None)
207        scheduler._pending_jobs = [(job, 'store1', False)]
208        scheduler._real_add_job = MagicMock()
209        scheduler._dispatch_event = MagicMock()
210        scheduler.start()
211
212        scheduler._executors['exec1'].start.assert_called_once_with(scheduler, 'exec1')
213        scheduler._executors['exec2'].start.assert_called_once_with(scheduler, 'exec2')
214        scheduler._jobstores['store1'].start.assert_called_once_with(scheduler, 'store1')
215        scheduler._jobstores['store2'].start.assert_called_once_with(scheduler, 'store2')
216        assert len(scheduler._executors) == 3
217        assert len(scheduler._jobstores) == 3
218        assert 'default' in scheduler._executors
219        assert 'default' in scheduler._jobstores
220
221        scheduler._real_add_job.assert_called_once_with(job, 'store1', False)
222        assert scheduler._pending_jobs == []
223
224        assert scheduler._dispatch_event.call_count == 3
225        event = scheduler._dispatch_event.call_args_list[0][0][0]
226        assert event.code == EVENT_EXECUTOR_ADDED
227        assert event.alias == 'default'
228        event = scheduler._dispatch_event.call_args_list[1][0][0]
229        assert event.code == EVENT_JOBSTORE_ADDED
230        assert event.alias == 'default'
231        event = scheduler._dispatch_event.call_args_list[2][0][0]
232        assert event.code == EVENT_SCHEDULER_STARTED
233
234        assert scheduler.state == STATE_RUNNING
235
236    @pytest.mark.parametrize('wait', [True, False], ids=['wait', 'nowait'])
237    def test_shutdown(self, scheduler, scheduler_events, wait):
238        executor = DummyExecutor()
239        jobstore = DummyJobStore()
240        scheduler.add_executor(executor)
241        scheduler.add_jobstore(jobstore)
242        scheduler.start(paused=True)
243        del scheduler_events[:]
244        scheduler.shutdown(wait)
245
246        assert scheduler.state == STATE_STOPPED
247        assert len(scheduler_events) == 1
248        assert scheduler_events[0].code == EVENT_SCHEDULER_SHUTDOWN
249
250        executor.shutdown.assert_called_once_with(wait)
251        jobstore.shutdown.assert_called_once_with()
252
253    def test_pause_resume(self, scheduler, scheduler_events):
254        scheduler.start()
255        del scheduler_events[:]
256        scheduler.wakeup.reset_mock()
257
258        scheduler.pause()
259
260        assert len(scheduler_events) == 1
261        assert scheduler_events[0].code == EVENT_SCHEDULER_PAUSED
262        assert not scheduler.wakeup.called
263
264        scheduler.resume()
265        assert len(scheduler_events) == 2
266        assert scheduler_events[1].code == EVENT_SCHEDULER_RESUMED
267        assert scheduler.wakeup.called
268
269    @pytest.mark.parametrize('start_scheduler', [True, False])
270    def test_running(self, scheduler, start_scheduler):
271        if start_scheduler:
272            scheduler.start()
273
274        assert scheduler.running is start_scheduler
275
276    @pytest.mark.parametrize('start_scheduler', [True, False])
277    def test_add_remove_executor(self, scheduler, scheduler_events, start_scheduler):
278        if start_scheduler:
279            scheduler.start(paused=True)
280
281        del scheduler_events[:]
282        executor = DummyExecutor()
283        scheduler.add_executor(executor, 'exec1')
284
285        assert len(scheduler_events) == 1
286        assert scheduler_events[0].code == EVENT_EXECUTOR_ADDED
287        assert scheduler_events[0].alias == 'exec1'
288        if start_scheduler:
289            executor.start.assert_called_once_with(scheduler, 'exec1')
290        else:
291            assert not executor.start.called
292
293        scheduler.remove_executor('exec1')
294        assert len(scheduler_events) == 2
295        assert scheduler_events[1].code == EVENT_EXECUTOR_REMOVED
296        assert scheduler_events[1].alias == 'exec1'
297        assert executor.shutdown.called
298
299    def test_add_executor_already_exists(self, scheduler):
300        executor = DummyExecutor()
301        scheduler.add_executor(executor)
302        exc = pytest.raises(ValueError, scheduler.add_executor, executor)
303        assert str(exc.value) == 'This scheduler already has an executor by the alias of "default"'
304
305    def test_remove_executor_nonexistent(self, scheduler):
306        pytest.raises(KeyError, scheduler.remove_executor, 'foo')
307
308    @pytest.mark.parametrize('start_scheduler', [True, False])
309    def test_add_jobstore(self, scheduler, scheduler_events, start_scheduler):
310        """
311        Test that the proper event is dispatched when a job store is added and the scheduler's
312        wake() method is called if the scheduler is running.
313
314        """
315        if start_scheduler:
316            scheduler.start()
317
318        del scheduler_events[:]
319        jobstore = DummyJobStore()
320        scheduler.add_jobstore(jobstore, 'store1')
321
322        assert len(scheduler_events) == 1
323        assert scheduler_events[0].code == EVENT_JOBSTORE_ADDED
324        assert scheduler_events[0].alias == 'store1'
325
326        if start_scheduler:
327            assert scheduler.wakeup.called
328            jobstore.start.assert_called_once_with(scheduler, 'store1')
329        else:
330            assert not jobstore.start.called
331
332    def test_add_jobstore_already_exists(self, scheduler):
333        """
334        Test that ValueError is raised when a job store is added with an alias that already exists.
335
336        """
337        jobstore = MemoryJobStore()
338        scheduler.add_jobstore(jobstore)
339        exc = pytest.raises(ValueError, scheduler.add_jobstore, jobstore)
340        assert str(exc.value) == 'This scheduler already has a job store by the alias of "default"'
341
342    def test_remove_jobstore(self, scheduler, scheduler_events):
343        scheduler.add_jobstore(MemoryJobStore(), 'foo')
344        scheduler.remove_jobstore('foo')
345
346        assert len(scheduler_events) == 2
347        assert scheduler_events[1].code == EVENT_JOBSTORE_REMOVED
348        assert scheduler_events[1].alias == 'foo'
349
350    def test_remove_jobstore_nonexistent(self, scheduler):
351        pytest.raises(KeyError, scheduler.remove_jobstore, 'foo')
352
353    def test_add_remove_listener(self, scheduler):
354        """Test that event dispatch works but removed listeners aren't called."""
355        events = []
356        scheduler.add_listener(events.append, EVENT_EXECUTOR_ADDED)
357        scheduler.add_executor(DummyExecutor(), 'exec1')
358        scheduler.remove_listener(events.append)
359        scheduler.add_executor(DummyExecutor(), 'exec2')
360        assert len(events) == 1
361
362    def test_add_job_return_value(self, scheduler, timezone):
363        """Test that when a job is added to a stopped scheduler, a Job instance is returned."""
364        job = scheduler.add_job(lambda x, y: None, 'date', [1], {'y': 2}, 'my-id', 'dummy',
365                                next_run_time=datetime(2014, 5, 23, 10),
366                                run_date='2014-06-01 08:41:00')
367
368        assert isinstance(job, Job)
369        assert job.id == 'my-id'
370        assert not hasattr(job, 'misfire_grace_time')
371        assert not hasattr(job, 'coalesce')
372        assert not hasattr(job, 'max_instances')
373        assert job.next_run_time.tzinfo.zone == timezone.zone
374
375    def test_add_job_pending(self, scheduler, scheduler_events):
376        """
377        Test that when a job is added to a stopped scheduler, it is not added to a job store until
378        the scheduler is started and that the event is dispatched when that happens.
379
380        """
381        scheduler.configure(job_defaults={
382            'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6
383        })
384        job = scheduler.add_job(lambda: None, 'interval', hours=1)
385        assert not scheduler_events
386
387        scheduler.start(paused=True)
388
389        assert len(scheduler_events) == 3
390        assert scheduler_events[2].code == EVENT_JOB_ADDED
391        assert scheduler_events[2].job_id is job.id
392
393        # Check that the undefined values were replaced with scheduler's job defaults
394        assert job.misfire_grace_time == 3
395        assert not job.coalesce
396        assert job.max_instances == 6
397
398    def test_add_job_id_conflict(self, scheduler):
399        """
400        Test that if a job is added with an already existing id, ConflictingIdError is raised.
401
402        """
403        scheduler.start(paused=True)
404        scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1)
405        pytest.raises(ConflictingIdError, scheduler.add_job, lambda: None, 'interval',
406                      id='testjob', seconds=1)
407
408    def test_add_job_replace(self, scheduler):
409        """Test that with replace_existing=True, a new job replaces another with the same id."""
410        scheduler.start(paused=True)
411        scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1)
412        scheduler.add_job(lambda: None, 'cron', id='testjob', name='replacement',
413                          replace_existing=True)
414        jobs = scheduler.get_jobs()
415        assert len(jobs) == 1
416        assert jobs[0].name == 'replacement'
417
418    def test_scheduled_job(self, scheduler):
419        def func(x, y):
420            pass
421
422        scheduler.add_job = MagicMock()
423        decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id',
424                                            'dummy', run_date='2014-06-01 08:41:00')
425        decorator(func)
426
427        scheduler.add_job.assert_called_once_with(
428            func, 'date', [1], {'y': 2}, 'my-id', 'dummy', undefined, undefined, undefined,
429            undefined, 'default', 'default', True, run_date='2014-06-01 08:41:00')
430
431    @pytest.mark.parametrize('pending', [True, False], ids=['pending job', 'scheduled job'])
432    def test_modify_job(self, scheduler, pending, timezone):
433        job = MagicMock()
434        scheduler._dispatch_event = MagicMock()
435        scheduler._lookup_job = MagicMock(return_value=(job, None if pending else 'default'))
436        if not pending:
437            jobstore = MagicMock()
438            scheduler._lookup_jobstore = lambda alias: jobstore if alias == 'default' else None
439        scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2,
440                             next_run_time=datetime(2014, 10, 17))
441
442        job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2,
443                                            next_run_time=datetime(2014, 10, 17))
444        if not pending:
445            jobstore.update_job.assert_called_once_with(job)
446
447        assert scheduler._dispatch_event.call_count == 1
448        event = scheduler._dispatch_event.call_args[0][0]
449        assert event.code == EVENT_JOB_MODIFIED
450        assert event.jobstore == (None if pending else 'default')
451
452    def test_reschedule_job(self, scheduler):
453        scheduler.modify_job = MagicMock()
454        trigger = MagicMock(get_next_fire_time=lambda previous, now: 1)
455        scheduler._create_trigger = MagicMock(return_value=trigger)
456        scheduler.reschedule_job('my-id', 'jobstore', 'date', run_date='2014-06-01 08:41:00')
457
458        assert scheduler.modify_job.call_count == 1
459        assert scheduler.modify_job.call_args[0] == ('my-id', 'jobstore')
460        assert scheduler.modify_job.call_args[1] == {'trigger': trigger, 'next_run_time': 1}
461
462    def test_pause_job(self, scheduler):
463        scheduler.modify_job = MagicMock()
464        scheduler.pause_job('job_id', 'jobstore')
465
466        scheduler.modify_job.assert_called_once_with('job_id', 'jobstore', next_run_time=None)
467
468    @pytest.mark.parametrize('dead_job', [True, False], ids=['dead job', 'live job'])
469    def test_resume_job(self, scheduler, freeze_time, dead_job):
470        next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1)
471        trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time)
472        returned_job = MagicMock(Job, id='foo', trigger=trigger)
473        scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
474        scheduler.modify_job = MagicMock()
475        scheduler.remove_job = MagicMock()
476        scheduler.resume_job('foo')
477
478        if dead_job:
479            scheduler.remove_job.assert_called_once_with('foo', 'bar')
480        else:
481            scheduler.modify_job.assert_called_once_with('foo', 'bar',
482                                                         next_run_time=next_fire_time)
483
484    @pytest.mark.parametrize('scheduler_started', [True, False], ids=['running', 'stopped'])
485    @pytest.mark.parametrize('jobstore', [None, 'other'],
486                             ids=['all jobstores', 'specific jobstore'])
487    def test_get_jobs(self, scheduler, scheduler_started, jobstore):
488        scheduler.add_jobstore(MemoryJobStore(), 'other')
489        scheduler.add_job(lambda: None, 'interval', seconds=1, id='job1')
490        scheduler.add_job(lambda: None, 'interval', seconds=1, id='job2', jobstore='other')
491        if scheduler_started:
492            scheduler.start(paused=True)
493
494        expected_job_ids = {'job2'}
495        if jobstore is None:
496            expected_job_ids.add('job1')
497
498        job_ids = {job.id for job in scheduler.get_jobs(jobstore)}
499        assert job_ids == expected_job_ids
500
501    @pytest.mark.parametrize('jobstore', [None, 'bar'], ids=['any jobstore', 'specific jobstore'])
502    def test_get_job(self, scheduler, jobstore):
503        returned_job = object()
504        scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
505        job = scheduler.get_job('foo', jobstore)
506
507        assert job is returned_job
508
509    def test_get_job_nonexistent_job(self, scheduler):
510        scheduler._lookup_job = MagicMock(side_effect=JobLookupError('foo'))
511        assert scheduler.get_job('foo') is None
512
513    def test_get_job_nonexistent_jobstore(self, scheduler):
514        assert scheduler.get_job('foo', 'bar') is None
515
516    @pytest.mark.parametrize('start_scheduler', [True, False])
517    @pytest.mark.parametrize('jobstore', [None, 'other'],
518                             ids=['any jobstore', 'specific jobstore'])
519    def test_remove_job(self, scheduler, scheduler_events, start_scheduler, jobstore):
520        scheduler.add_jobstore(MemoryJobStore(), 'other')
521        scheduler.add_job(lambda: None, id='job1')
522        if start_scheduler:
523            scheduler.start(paused=True)
524
525        del scheduler_events[:]
526        if jobstore:
527            pytest.raises(JobLookupError, scheduler.remove_job, 'job1', jobstore)
528            assert len(scheduler.get_jobs()) == 1
529            assert len(scheduler_events) == 0
530        else:
531            scheduler.remove_job('job1', jobstore)
532            assert len(scheduler.get_jobs()) == 0
533            assert len(scheduler_events) == 1
534            assert scheduler_events[0].code == EVENT_JOB_REMOVED
535
536    def test_remove_nonexistent_job(self, scheduler):
537        pytest.raises(JobLookupError, scheduler.remove_job, 'foo')
538
539    @pytest.mark.parametrize('start_scheduler', [True, False])
540    @pytest.mark.parametrize('jobstore', [None, 'other'], ids=['all', 'single jobstore'])
541    def test_remove_all_jobs(self, scheduler, start_scheduler, scheduler_events, jobstore):
542        """
543        Test that remove_all_jobs() removes all jobs from all attached job stores, plus any
544        pending jobs.
545
546        """
547        scheduler.add_jobstore(MemoryJobStore(), 'other')
548        scheduler.add_job(lambda: None, id='job1')
549        scheduler.add_job(lambda: None, id='job2')
550        scheduler.add_job(lambda: None, id='job3', jobstore='other')
551        if start_scheduler:
552            scheduler.start(paused=True)
553
554        del scheduler_events[:]
555        scheduler.remove_all_jobs(jobstore)
556        jobs = scheduler.get_jobs()
557
558        assert len(jobs) == (2 if jobstore else 0)
559        assert len(scheduler_events) == 1
560        assert scheduler_events[0].code == EVENT_ALL_JOBS_REMOVED
561        assert scheduler_events[0].alias == jobstore
562
563    @pytest.mark.parametrize('start_scheduler', [True, False])
564    @pytest.mark.parametrize('jobstore', [None, 'other'],
565                             ids=['all jobstores', 'specific jobstore'])
566    def test_print_jobs(self, scheduler, start_scheduler, jobstore):
567        scheduler.add_jobstore(MemoryJobStore(), 'other')
568        if start_scheduler:
569            scheduler.start(paused=True)
570
571        scheduler.add_job(lambda: None, 'date', run_date='2099-09-09', id='job1',
572                          name='test job 1')
573        scheduler.add_job(lambda: None, 'date', run_date='2099-08-08', id='job2',
574                          name='test job 2', jobstore='other')
575
576        outfile = StringIO()
577        scheduler.print_jobs(jobstore, outfile)
578
579        if jobstore and not start_scheduler:
580            assert outfile.getvalue() == """\
581Pending jobs:
582    test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending)
583"""
584        elif jobstore and start_scheduler:
585            assert outfile.getvalue() == """\
586Jobstore other:
587    test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET)
588"""
589        elif not jobstore and not start_scheduler:
590            assert outfile.getvalue() == """\
591Pending jobs:
592    test job 1 (trigger: date[2099-09-09 00:00:00 CET], pending)
593    test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending)
594"""
595        else:
596            assert outfile.getvalue() == """\
597Jobstore default:
598    test job 1 (trigger: date[2099-09-09 00:00:00 CET], next run at: 2099-09-09 00:00:00 CET)
599Jobstore other:
600    test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET)
601"""
602
603    @pytest.mark.parametrize('config', [
604        {
605            'timezone': 'UTC',
606            'job_defaults': {
607                'misfire_grace_time': '5',
608                'coalesce': 'false',
609                'max_instances': '9',
610            },
611            'executors': {
612                'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
613                'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
614            },
615            'jobstores': {
616                'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
617                'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
618            }
619        },
620        {
621            'timezone': utc,
622            'job_defaults': {
623                'misfire_grace_time': 5,
624                'coalesce': False,
625                'max_instances': 9,
626            },
627            'executors': {
628                'default': DummyExecutor(arg1='3', arg2='a'),
629                'alter': DummyExecutor(arg='true')
630            },
631            'jobstores': {
632                'default': DummyJobStore(arg1='3', arg2='a'),
633                'bar': DummyJobStore(arg='false')
634            }
635        }
636    ], ids=['references', 'instances'])
637    def test_configure_private(self, scheduler, config):
638        scheduler._configure(config)
639
640        assert scheduler.timezone is utc
641        assert scheduler._job_defaults == {
642            'misfire_grace_time': 5,
643            'coalesce': False,
644            'max_instances': 9
645        }
646        assert set(six.iterkeys(scheduler._executors)) == set(['default', 'alter'])
647        assert scheduler._executors['default'].args == {'arg1': '3', 'arg2': 'a'}
648        assert scheduler._executors['alter'].args == {'arg': 'true'}
649        assert set(six.iterkeys(scheduler._jobstores)) == set(['default', 'bar'])
650        assert scheduler._jobstores['default'].args == {'arg1': '3', 'arg2': 'a'}
651        assert scheduler._jobstores['bar'].args == {'arg': 'false'}
652
653    def test_configure_private_invalid_executor(self, scheduler):
654        exc = pytest.raises(TypeError, scheduler._configure, {'executors': {'default': 6}})
655        assert str(exc.value) == ("Expected executor instance or dict for executors['default'], "
656                                  "got int instead")
657
658    def test_configure_private_invalid_jobstore(self, scheduler):
659        exc = pytest.raises(TypeError, scheduler._configure, {'jobstores': {'default': 6}})
660        assert str(exc.value) == ("Expected job store instance or dict for jobstores['default'], "
661                                  "got int instead")
662
663    def test_create_default_executor(self, scheduler):
664        executor = scheduler._create_default_executor()
665        assert isinstance(executor, BaseExecutor)
666
667    def test_create_default_jobstore(self, scheduler):
668        store = scheduler._create_default_jobstore()
669        assert isinstance(store, BaseJobStore)
670
671    def test_lookup_executor(self, scheduler):
672        executor = object()
673        scheduler._executors = {'executor': executor}
674        assert scheduler._lookup_executor('executor') is executor
675
676    def test_lookup_executor_nonexistent(self, scheduler):
677        pytest.raises(KeyError, scheduler._lookup_executor, 'executor')
678
679    def test_lookup_jobstore(self, scheduler):
680        store = object()
681        scheduler._jobstores = {'store': store}
682        assert scheduler._lookup_jobstore('store') is store
683
684    def test_lookup_jobstore_nonexistent(self, scheduler):
685        pytest.raises(KeyError, scheduler._lookup_jobstore, 'store')
686
687    def test_dispatch_event(self, scheduler):
688        event = SchedulerEvent(1)
689        scheduler._listeners = [(MagicMock(), 2), (MagicMock(side_effect=Exception), 1),
690                                (MagicMock(), 1)]
691        scheduler._dispatch_event(event)
692
693        assert not scheduler._listeners[0][0].called
694        scheduler._listeners[1][0].assert_called_once_with(event)
695
696    @pytest.mark.parametrize('load_plugin', [True, False], ids=['load plugin', 'plugin loaded'])
697    def test_create_trigger(self, scheduler, load_plugin):
698        """Tests that creating a trigger with an already loaded plugin works."""
699
700        scheduler._trigger_plugins = {}
701        scheduler._trigger_classes = {}
702        if load_plugin:
703            scheduler._trigger_plugins['dummy'] = MagicMock(
704                load=MagicMock(return_value=DummyTrigger))
705        else:
706            scheduler._trigger_classes['dummy'] = DummyTrigger
707
708        result = scheduler._create_trigger('dummy', {'a': 1, 'b': 'x'})
709
710        assert isinstance(result, DummyTrigger)
711        assert result.args == {'a': 1, 'b': 'x', 'timezone': scheduler.timezone}
712
713    def test_create_trigger_instance(self, scheduler):
714        """Tests that passing a trigger instance will return the instance as-is."""
715        trigger_instance = DummyTrigger()
716
717        assert scheduler._create_trigger(trigger_instance, {}) is trigger_instance
718
719    def test_create_trigger_default_type(self, scheduler):
720        """Tests that passing None as the trigger will create a "date" trigger instance."""
721        scheduler._trigger_classes = {'date': DummyTrigger}
722        result = scheduler._create_trigger(None, {'a': 1})
723
724        assert isinstance(result, DummyTrigger)
725        assert result.args == {'a': 1, 'timezone': scheduler.timezone}
726
727    def test_create_trigger_bad_trigger_type(self, scheduler):
728        exc = pytest.raises(TypeError, scheduler._create_trigger, 1, {})
729        assert str(exc.value) == 'Expected a trigger instance or string, got int instead'
730
731    def test_create_trigger_bad_plugin_type(self, scheduler):
732        mock_plugin = MagicMock()
733        mock_plugin.load.configure_mock(return_value=object)
734        scheduler._trigger_classes = {}
735        scheduler._trigger_plugins = {'dummy': mock_plugin}
736        exc = pytest.raises(TypeError, scheduler._create_trigger, 'dummy', {})
737        assert str(exc.value) == 'The trigger entry point does not point to a trigger class'
738
739    def test_create_trigger_nonexisting_plugin(self, scheduler):
740        exc = pytest.raises(LookupError, scheduler._create_trigger, 'dummy', {})
741        assert str(exc.value) == 'No trigger by the name "dummy" was found'
742
743    def test_create_lock(self, scheduler):
744        lock = scheduler._create_lock()
745        assert hasattr(lock, '__enter__')
746
747    def test_process_jobs_empty(self, scheduler):
748        assert scheduler._process_jobs() is None
749
750    def test_job_submitted_event(self, scheduler, freeze_time):
751        events = []
752        scheduler.add_job(lambda: None, run_date=freeze_time.get())
753        scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED)
754        scheduler.start()
755        scheduler._process_jobs()
756
757        assert len(events) == 1
758        assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
759
760    @pytest.mark.parametrize('scheduler_events', [EVENT_JOB_MAX_INSTANCES],
761                             indirect=['scheduler_events'])
762    def test_job_max_instances_event(self, scheduler, scheduler_events, freeze_time):
763        class MaxedOutExecutor(DebugExecutor):
764            def submit_job(self, job, run_times):
765                raise MaxInstancesReachedError(job)
766
767        executor = MaxedOutExecutor()
768        scheduler.add_executor(executor, 'maxed')
769        scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed')
770        scheduler.start()
771        scheduler._process_jobs()
772
773        assert len(scheduler_events) == 1
774        assert scheduler_events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
775
776    def test_serialize_scheduler(self, scheduler):
777        pytest.raises(TypeError, pickle.dumps, scheduler).match('Schedulers cannot be serialized')
778
779
780class TestProcessJobs(object):
781    @pytest.fixture
782    def job(self):
783        job = MagicMock(Job, id=999, executor='default')
784        job.trigger = MagicMock(get_next_fire_time=MagicMock(return_value=None))
785        job. __str__ = lambda x: 'job 999'
786        return job
787
788    @pytest.fixture
789    def scheduler(self):
790        scheduler = DummyScheduler()
791        scheduler.start()
792        return scheduler
793
794    @pytest.fixture
795    def jobstore(self, scheduler, job):
796        jobstore = MagicMock(BaseJobStore, get_due_jobs=MagicMock(return_value=[job]),
797                             get_next_run_time=MagicMock(return_value=None))
798        scheduler._jobstores['default'] = jobstore
799        return jobstore
800
801    @pytest.fixture
802    def executor(self, scheduler):
803        executor = MagicMock(BaseExecutor)
804        scheduler._executors['default'] = executor
805        return executor
806
807    def test_nonexistent_executor(self, scheduler, jobstore, caplog):
808        """
809        Test that an error is logged and the job is removed from its job store if its executor is
810        not found.
811
812        """
813        caplog.setLevel(logging.ERROR)
814        scheduler.remove_executor('default')
815        assert scheduler._process_jobs() is None
816        jobstore.remove_job.assert_called_once_with(999)
817        assert len(caplog.records()) == 2
818        assert caplog.records()[1].message == \
819            'Executor lookup ("default") failed for job "job 999" -- removing it from the job ' \
820            'store'
821
822    def test_max_instances_reached(self, scheduler, job, jobstore, executor, caplog):
823        """Tests that a warning is logged when the maximum instances of a job is reached."""
824        caplog.setLevel(logging.WARNING)
825        executor.submit_job = MagicMock(side_effect=MaxInstancesReachedError(job))
826
827        assert scheduler._process_jobs() is None
828        assert len(caplog.records()) == 2
829        assert caplog.records()[1].message == \
830            'Execution of job "job 999" skipped: maximum number of running instances reached (1)'
831
832    def test_executor_error(self, scheduler, jobstore, executor, caplog):
833        """Tests that if any exception is raised in executor.submit(), it is logged."""
834        caplog.setLevel(logging.ERROR)
835        executor.submit_job = MagicMock(side_effect=Exception('test message'))
836
837        assert scheduler._process_jobs() is None
838        assert len(caplog.records()) == 2
839        assert 'test message' in caplog.records()[1].exc_text
840        assert 'Error submitting job "job 999" to executor "default"' in caplog.records()[1].message
841
842    def test_job_update(self, scheduler, job, jobstore, freeze_time):
843        """
844        Tests that the job is updated in its job store with the next run time from the trigger.
845
846        """
847        next_run_time = freeze_time.current + timedelta(seconds=6)
848        job.trigger.get_next_fire_time = MagicMock(return_value=next_run_time)
849        assert scheduler._process_jobs() is None
850        job._modify.assert_called_once_with(next_run_time=next_run_time)
851        jobstore.update_job.assert_called_once_with(job)
852
853    def test_wait_time(self, scheduler, freeze_time):
854        """
855        Tests that the earliest next run time from all job stores is returned (ignoring Nones).
856
857        """
858        scheduler._jobstores = {
859            'default': MagicMock(get_next_run_time=MagicMock(
860                return_value=freeze_time.current + timedelta(seconds=8))),
861            'alter': MagicMock(get_next_run_time=MagicMock(return_value=None)),
862            'another': MagicMock(get_next_run_time=MagicMock(
863                return_value=freeze_time.current + timedelta(seconds=5))),
864            'more': MagicMock(get_next_run_time=MagicMock(
865                return_value=freeze_time.current + timedelta(seconds=6))),
866        }
867
868        assert scheduler._process_jobs() == 5
869
870
871class SchedulerImplementationTestBase(object):
872    @pytest.fixture(autouse=True)
873    def executor(self, scheduler):
874        scheduler.add_executor(DebugExecutor())
875
876    @pytest.fixture
877    def start_scheduler(self, request, scheduler):
878        yield scheduler.start
879        if scheduler.running:
880            scheduler.shutdown()
881
882    @pytest.fixture
883    def eventqueue(self, scheduler):
884        from six.moves.queue import Queue
885        events = Queue()
886        scheduler.add_listener(events.put)
887        return events
888
889    def wait_event(self, queue):
890        return queue.get(True, 1)
891
892    def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
893        """Tests that pending jobs are added (and if due, executed) when the scheduler starts."""
894        freeze_time.set_increment(timedelta(seconds=0.2))
895        scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], run_date=freeze_time.next())
896        start_scheduler()
897
898        assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
899        assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED
900        assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
901        event = self.wait_event(eventqueue)
902        assert event.code == EVENT_JOB_EXECUTED
903        assert event.retval == 3
904        assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED
905
906    def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
907        """Tests that adding a job causes it to be executed after the specified delay."""
908        freeze_time.set_increment(timedelta(seconds=0.2))
909        start_scheduler()
910        assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
911        assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
912
913        scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2],
914                          run_date=freeze_time.next() + freeze_time.increment * 2)
915        assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED
916        event = self.wait_event(eventqueue)
917        assert event.code == EVENT_JOB_EXECUTED
918        assert event.retval == 3
919        assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED
920
921    def test_shutdown(self, scheduler, eventqueue, start_scheduler):
922        """Tests that shutting down the scheduler emits the proper event."""
923        start_scheduler()
924        assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
925        assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
926
927        scheduler.shutdown()
928        assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN
929
930
931class TestBlockingScheduler(SchedulerImplementationTestBase):
932    @pytest.fixture
933    def scheduler(self):
934        from apscheduler.schedulers.blocking import BlockingScheduler
935        return BlockingScheduler()
936
937    @pytest.fixture
938    def start_scheduler(self, request, scheduler):
939        thread = Thread(target=scheduler.start)
940        yield thread.start
941
942        if scheduler.running:
943            scheduler.shutdown()
944        thread.join()
945
946
947class TestBackgroundScheduler(SchedulerImplementationTestBase):
948    @pytest.fixture
949    def scheduler(self):
950        from apscheduler.schedulers.background import BackgroundScheduler
951        return BackgroundScheduler()
952
953
954class TestAsyncIOScheduler(SchedulerImplementationTestBase):
955    @pytest.fixture
956    def event_loop(self):
957        asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
958        return asyncio.asyncio.new_event_loop()
959
960    @pytest.fixture
961    def scheduler(self, event_loop):
962        asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
963        return asyncio.AsyncIOScheduler(event_loop=event_loop)
964
965    @pytest.fixture
966    def start_scheduler(self, request, event_loop, scheduler):
967        event_loop.call_soon_threadsafe(scheduler.start)
968        thread = Thread(target=event_loop.run_forever)
969        yield thread.start
970
971        if scheduler.running:
972            event_loop.call_soon_threadsafe(scheduler.shutdown)
973        event_loop.call_soon_threadsafe(event_loop.stop)
974        thread.join()
975
976
977class TestGeventScheduler(SchedulerImplementationTestBase):
978    @pytest.fixture
979    def scheduler(self):
980        gevent = pytest.importorskip('apscheduler.schedulers.gevent')
981        return gevent.GeventScheduler()
982
983    @pytest.fixture
984    def calc_event(self):
985        from gevent.event import Event
986        return Event()
987
988    @pytest.fixture
989    def eventqueue(self, scheduler):
990        from gevent.queue import Queue
991        events = Queue()
992        scheduler.add_listener(events.put)
993        return events
994
995
996class TestTornadoScheduler(SchedulerImplementationTestBase):
997    @pytest.fixture
998    def io_loop(self):
999        ioloop = pytest.importorskip('tornado.ioloop')
1000        return ioloop.IOLoop()
1001
1002    @pytest.fixture
1003    def scheduler(self, io_loop):
1004        tornado = pytest.importorskip('apscheduler.schedulers.tornado')
1005        return tornado.TornadoScheduler(io_loop=io_loop)
1006
1007    @pytest.fixture
1008    def start_scheduler(self, request, io_loop, scheduler):
1009        io_loop.add_callback(scheduler.start)
1010        thread = Thread(target=io_loop.start)
1011        yield thread.start
1012
1013        if scheduler.running:
1014            io_loop.add_callback(scheduler.shutdown)
1015        io_loop.add_callback(io_loop.stop)
1016        thread.join()
1017
1018
1019class TestTwistedScheduler(SchedulerImplementationTestBase):
1020    @pytest.fixture
1021    def reactor(self):
1022        selectreactor = pytest.importorskip('twisted.internet.selectreactor')
1023        return selectreactor.SelectReactor()
1024
1025    @pytest.fixture
1026    def scheduler(self, reactor):
1027        twisted = pytest.importorskip('apscheduler.schedulers.twisted')
1028        return twisted.TwistedScheduler(reactor=reactor)
1029
1030    @pytest.fixture
1031    def start_scheduler(self, request, reactor, scheduler):
1032        reactor.callFromThread(scheduler.start)
1033        thread = Thread(target=reactor.run, args=(False,))
1034        yield thread.start
1035
1036        if scheduler.running:
1037            reactor.callFromThread(scheduler.shutdown)
1038        reactor.callFromThread(reactor.stop)
1039        thread.join()
1040
1041
1042@pytest.mark.skip
1043class TestQtScheduler(SchedulerImplementationTestBase):
1044    @pytest.fixture(scope='class')
1045    def coreapp(self):
1046        QtCore = pytest.importorskip('PySide.QtCore')
1047        QtCore.QCoreApplication([])
1048
1049    @pytest.fixture
1050    def scheduler(self, coreapp):
1051        qt = pytest.importorskip('apscheduler.schedulers.qt')
1052        return qt.QtScheduler()
1053
1054    def wait_event(self, queue):
1055        from PySide.QtCore import QCoreApplication
1056
1057        while queue.empty():
1058            QCoreApplication.processEvents()
1059        return queue.get_nowait()
1060