1from __future__ import absolute_import, unicode_literals
2
3import gc
4import itertools
5import os
6import ssl
7from copy import deepcopy
8from datetime import datetime, timedelta
9from pickle import dumps, loads
10
11import pytest
12from case import ContextMock, Mock, mock, patch
13from vine import promise
14
15from celery import Celery, _state
16from celery import app as _app
17from celery import current_app, shared_task
18from celery.app import base as _appbase
19from celery.app import defaults
20from celery.exceptions import ImproperlyConfigured
21from celery.five import items, keys
22from celery.loaders.base import unconfigured
23from celery.platforms import pyimplementation
24from celery.utils.collections import DictAttribute
25from celery.utils.objects import Bunch
26from celery.utils.serialization import pickle
27from celery.utils.time import localize, timezone, to_utc
28
29THIS_IS_A_KEY = 'this is a value'
30
31
32class ObjectConfig(object):
33    FOO = 1
34    BAR = 2
35
36
37object_config = ObjectConfig()
38dict_config = {'FOO': 10, 'BAR': 20}
39
40
41class ObjectConfig2(object):
42    LEAVE_FOR_WORK = True
43    MOMENT_TO_STOP = True
44    CALL_ME_BACK = 123456789
45    WANT_ME_TO = False
46    UNDERSTAND_ME = True
47
48
49class test_module:
50
51    def test_default_app(self):
52        assert _app.default_app == _state.default_app
53
54    def test_bugreport(self, app):
55        assert _app.bugreport(app=app)
56
57
58class test_task_join_will_block:
59
60    def test_task_join_will_block(self, patching):
61        patching('celery._state._task_join_will_block', 0)
62        assert _state._task_join_will_block == 0
63        _state._set_task_join_will_block(True)
64        assert _state._task_join_will_block is True
65        # fixture 'app' sets this, so need to use orig_ function
66        # set there by that fixture.
67        res = _state.orig_task_join_will_block()
68        assert res is True
69
70
71class test_App:
72
73    def setup(self):
74        self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))
75
76    def test_now(self):
77        timezone_setting_value = 'US/Eastern'
78        tz_utc = timezone.get_timezone('UTC')
79        tz_us_eastern = timezone.get_timezone(timezone_setting_value)
80
81        now = to_utc(datetime.utcnow())
82        app_now = self.app.now()
83
84        assert app_now.tzinfo is tz_utc
85        assert app_now - now <= timedelta(seconds=1)
86
87        # Check that timezone conversion is applied from configuration
88        self.app.conf.enable_utc = False
89        self.app.conf.timezone = timezone_setting_value
90        # timezone is a cached property
91        del self.app.timezone
92
93        app_now = self.app.now()
94
95        assert app_now.tzinfo.zone == tz_us_eastern.zone
96
97        diff = to_utc(datetime.utcnow()) - localize(app_now, tz_utc)
98        assert diff <= timedelta(seconds=1)
99
100        # Verify that timezone setting overrides enable_utc=on setting
101        self.app.conf.enable_utc = True
102        del self.app.timezone
103        app_now = self.app.now()
104        assert self.app.timezone == tz_us_eastern
105        assert app_now.tzinfo.zone == tz_us_eastern.zone
106
107    @patch('celery.app.base.set_default_app')
108    def test_set_default(self, set_default_app):
109        self.app.set_default()
110        set_default_app.assert_called_with(self.app)
111
112    @patch('celery.security.setup_security')
113    def test_setup_security(self, setup_security):
114        self.app.setup_security(
115            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer')
116        setup_security.assert_called_with(
117            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer',
118            app=self.app)
119
120    def test_task_autofinalize_disabled(self):
121        with self.Celery('xyzibari', autofinalize=False) as app:
122            @app.task
123            def ttafd():
124                return 42
125
126            with pytest.raises(RuntimeError):
127                ttafd()
128
129        with self.Celery('xyzibari', autofinalize=False) as app:
130            @app.task
131            def ttafd2():
132                return 42
133
134            app.finalize()
135            assert ttafd2() == 42
136
137    def test_registry_autofinalize_disabled(self):
138        with self.Celery('xyzibari', autofinalize=False) as app:
139            with pytest.raises(RuntimeError):
140                app.tasks['celery.chain']
141            app.finalize()
142            assert app.tasks['celery.chain']
143
144    def test_task(self):
145        with self.Celery('foozibari') as app:
146
147            def fun():
148                pass
149
150            fun.__module__ = '__main__'
151            task = app.task(fun)
152            assert task.name == app.main + '.fun'
153
154    def test_task_too_many_args(self):
155        with pytest.raises(TypeError):
156            self.app.task(Mock(name='fun'), True)
157        with pytest.raises(TypeError):
158            self.app.task(Mock(name='fun'), True, 1, 2)
159
160    def test_with_config_source(self):
161        with self.Celery(config_source=ObjectConfig) as app:
162            assert app.conf.FOO == 1
163            assert app.conf.BAR == 2
164
165    @pytest.mark.usefixtures('depends_on_current_app')
166    def test_task_windows_execv(self):
167        prev, _appbase.USING_EXECV = _appbase.USING_EXECV, True
168        try:
169            @self.app.task(shared=False)
170            def foo():
171                pass
172
173            assert foo._get_current_object()  # is proxy
174
175        finally:
176            _appbase.USING_EXECV = prev
177        assert not _appbase.USING_EXECV
178
179    def test_task_takes_no_args(self):
180        with pytest.raises(TypeError):
181            @self.app.task(1)
182            def foo():
183                pass
184
185    def test_add_defaults(self):
186        assert not self.app.configured
187        _conf = {'foo': 300}
188
189        def conf():
190            return _conf
191
192        self.app.add_defaults(conf)
193        assert conf in self.app._pending_defaults
194        assert not self.app.configured
195        assert self.app.conf.foo == 300
196        assert self.app.configured
197        assert not self.app._pending_defaults
198
199        # defaults not pickled
200        appr = loads(dumps(self.app))
201        with pytest.raises(AttributeError):
202            appr.conf.foo
203
204        # add more defaults after configured
205        conf2 = {'foo': 'BAR'}
206        self.app.add_defaults(conf2)
207        assert self.app.conf.foo == 'BAR'
208
209        assert _conf in self.app.conf.defaults
210        assert conf2 in self.app.conf.defaults
211
212    def test_connection_or_acquire(self):
213        with self.app.connection_or_acquire(block=True):
214            assert self.app.pool._dirty
215
216        with self.app.connection_or_acquire(pool=False):
217            assert not self.app.pool._dirty
218
219    def test_using_v1_reduce(self):
220        self.app._using_v1_reduce = True
221        assert loads(dumps(self.app))
222
223    def test_autodiscover_tasks_force(self):
224        self.app.loader.autodiscover_tasks = Mock()
225        self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True)
226        self.app.loader.autodiscover_tasks.assert_called_with(
227            ['proj.A', 'proj.B'], 'tasks',
228        )
229        self.app.loader.autodiscover_tasks = Mock()
230
231        def lazy_list():
232            return ['proj.A', 'proj.B']
233        self.app.autodiscover_tasks(
234            lazy_list,
235            related_name='george',
236            force=True,
237        )
238        self.app.loader.autodiscover_tasks.assert_called_with(
239            ['proj.A', 'proj.B'], 'george',
240        )
241
242    def test_autodiscover_tasks_lazy(self):
243        with patch('celery.signals.import_modules') as import_modules:
244            def lazy_list():
245                return [1, 2, 3]
246            self.app.autodiscover_tasks(lazy_list)
247            import_modules.connect.assert_called()
248            prom = import_modules.connect.call_args[0][0]
249            assert isinstance(prom, promise)
250            assert prom.fun == self.app._autodiscover_tasks
251            assert prom.args[0](), [1, 2 == 3]
252
253    def test_autodiscover_tasks__no_packages(self):
254        fixup1 = Mock(name='fixup')
255        fixup2 = Mock(name='fixup')
256        self.app._autodiscover_tasks_from_names = Mock(name='auto')
257        self.app._fixups = [fixup1, fixup2]
258        fixup1.autodiscover_tasks.return_value = ['A', 'B', 'C']
259        fixup2.autodiscover_tasks.return_value = ['D', 'E', 'F']
260        self.app.autodiscover_tasks(force=True)
261        self.app._autodiscover_tasks_from_names.assert_called_with(
262            ['A', 'B', 'C', 'D', 'E', 'F'], related_name='tasks',
263        )
264
265    def test_with_broker(self, patching):
266        patching.setenv('CELERY_BROKER_URL', '')
267        with self.Celery(broker='foo://baribaz') as app:
268            assert app.conf.broker_url == 'foo://baribaz'
269
270    def test_pending_configuration__setattr(self):
271        with self.Celery(broker='foo://bar') as app:
272            app.conf.task_default_delivery_mode = 44
273            app.conf.worker_agent = 'foo:Bar'
274            assert not app.configured
275            assert app.conf.worker_agent == 'foo:Bar'
276            assert app.conf.broker_url == 'foo://bar'
277            assert app._preconf['worker_agent'] == 'foo:Bar'
278
279            assert app.configured
280            reapp = pickle.loads(pickle.dumps(app))
281            assert reapp._preconf['worker_agent'] == 'foo:Bar'
282            assert not reapp.configured
283            assert reapp.conf.worker_agent == 'foo:Bar'
284            assert reapp.configured
285            assert reapp.conf.broker_url == 'foo://bar'
286            assert reapp._preconf['worker_agent'] == 'foo:Bar'
287
288    def test_pending_configuration__update(self):
289        with self.Celery(broker='foo://bar') as app:
290            app.conf.update(
291                task_default_delivery_mode=44,
292                worker_agent='foo:Bar',
293            )
294            assert not app.configured
295            assert app.conf.worker_agent == 'foo:Bar'
296            assert app.conf.broker_url == 'foo://bar'
297            assert app._preconf['worker_agent'] == 'foo:Bar'
298
299    def test_pending_configuration__compat_settings(self):
300        with self.Celery(broker='foo://bar', backend='foo') as app:
301            app.conf.update(
302                CELERY_ALWAYS_EAGER=4,
303                CELERY_DEFAULT_DELIVERY_MODE=63,
304                CELERYD_AGENT='foo:Barz',
305            )
306            assert app.conf.task_always_eager == 4
307            assert app.conf.task_default_delivery_mode == 63
308            assert app.conf.worker_agent == 'foo:Barz'
309            assert app.conf.broker_url == 'foo://bar'
310            assert app.conf.result_backend == 'foo'
311
312    def test_pending_configuration__compat_settings_mixing(self):
313        with self.Celery(broker='foo://bar', backend='foo') as app:
314            app.conf.update(
315                CELERY_ALWAYS_EAGER=4,
316                CELERY_DEFAULT_DELIVERY_MODE=63,
317                CELERYD_AGENT='foo:Barz',
318                worker_consumer='foo:Fooz',
319            )
320            with pytest.raises(ImproperlyConfigured):
321                assert app.conf.task_always_eager == 4
322
323    def test_pending_configuration__django_settings(self):
324        with self.Celery(broker='foo://bar', backend='foo') as app:
325            app.config_from_object(DictAttribute(Bunch(
326                CELERY_TASK_ALWAYS_EAGER=4,
327                CELERY_TASK_DEFAULT_DELIVERY_MODE=63,
328                CELERY_WORKER_AGENT='foo:Barz',
329                CELERY_RESULT_SERIALIZER='pickle',
330            )), namespace='CELERY')
331            assert app.conf.result_serializer == 'pickle'
332            assert app.conf.CELERY_RESULT_SERIALIZER == 'pickle'
333            assert app.conf.task_always_eager == 4
334            assert app.conf.task_default_delivery_mode == 63
335            assert app.conf.worker_agent == 'foo:Barz'
336            assert app.conf.broker_url == 'foo://bar'
337            assert app.conf.result_backend == 'foo'
338
339    def test_pending_configuration__compat_settings_mixing_new(self):
340        with self.Celery(broker='foo://bar', backend='foo') as app:
341            app.conf.update(
342                task_always_eager=4,
343                task_default_delivery_mode=63,
344                worker_agent='foo:Barz',
345                CELERYD_CONSUMER='foo:Fooz',
346                CELERYD_AUTOSCALER='foo:Xuzzy',
347            )
348            with pytest.raises(ImproperlyConfigured):
349                assert app.conf.worker_consumer == 'foo:Fooz'
350
351    def test_pending_configuration__compat_settings_mixing_alt(self):
352        with self.Celery(broker='foo://bar', backend='foo') as app:
353            app.conf.update(
354                task_always_eager=4,
355                task_default_delivery_mode=63,
356                worker_agent='foo:Barz',
357                CELERYD_CONSUMER='foo:Fooz',
358                worker_consumer='foo:Fooz',
359                CELERYD_AUTOSCALER='foo:Xuzzy',
360                worker_autoscaler='foo:Xuzzy'
361            )
362
363    def test_pending_configuration__setdefault(self):
364        with self.Celery(broker='foo://bar') as app:
365            assert not app.configured
366            app.conf.setdefault('worker_agent', 'foo:Bar')
367            assert not app.configured
368
369    def test_pending_configuration__iter(self):
370        with self.Celery(broker='foo://bar') as app:
371            app.conf.worker_agent = 'foo:Bar'
372            assert not app.configured
373            assert list(keys(app.conf))
374            assert app.configured
375            assert 'worker_agent' in app.conf
376            assert dict(app.conf)
377
378    def test_pending_configuration__raises_ImproperlyConfigured(self):
379        with self.Celery(set_as_current=False) as app:
380            app.conf.worker_agent = 'foo://bar'
381            app.conf.task_default_delivery_mode = 44
382            app.conf.CELERY_ALWAYS_EAGER = 5
383            with pytest.raises(ImproperlyConfigured):
384                app.finalize()
385
386        with self.Celery() as app:
387            assert not self.app.conf.task_always_eager
388
389    def test_pending_configuration__ssl_settings(self):
390        with self.Celery(broker='foo://bar',
391                         broker_use_ssl={
392                             'ssl_cert_reqs': ssl.CERT_REQUIRED,
393                             'ssl_ca_certs': '/path/to/ca.crt',
394                             'ssl_certfile': '/path/to/client.crt',
395                             'ssl_keyfile': '/path/to/client.key'},
396                         redis_backend_use_ssl={
397                             'ssl_cert_reqs': ssl.CERT_REQUIRED,
398                             'ssl_ca_certs': '/path/to/ca.crt',
399                             'ssl_certfile': '/path/to/client.crt',
400                             'ssl_keyfile': '/path/to/client.key'}) as app:
401            assert not app.configured
402            assert app.conf.broker_url == 'foo://bar'
403            assert app.conf.broker_use_ssl['ssl_certfile'] == \
404                '/path/to/client.crt'
405            assert app.conf.broker_use_ssl['ssl_keyfile'] == \
406                '/path/to/client.key'
407            assert app.conf.broker_use_ssl['ssl_ca_certs'] == \
408                '/path/to/ca.crt'
409            assert app.conf.broker_use_ssl['ssl_cert_reqs'] == \
410                ssl.CERT_REQUIRED
411            assert app.conf.redis_backend_use_ssl['ssl_certfile'] == \
412                '/path/to/client.crt'
413            assert app.conf.redis_backend_use_ssl['ssl_keyfile'] == \
414                '/path/to/client.key'
415            assert app.conf.redis_backend_use_ssl['ssl_ca_certs'] == \
416                '/path/to/ca.crt'
417            assert app.conf.redis_backend_use_ssl['ssl_cert_reqs'] == \
418                ssl.CERT_REQUIRED
419
420    def test_repr(self):
421        assert repr(self.app)
422
423    def test_custom_task_registry(self):
424        with self.Celery(tasks=self.app.tasks) as app2:
425            assert app2.tasks is self.app.tasks
426
427    def test_include_argument(self):
428        with self.Celery(include=('foo', 'bar.foo')) as app:
429            assert app.conf.include, ('foo' == 'bar.foo')
430
431    def test_set_as_current(self):
432        current = _state._tls.current_app
433        try:
434            app = self.Celery(set_as_current=True)
435            assert _state._tls.current_app is app
436        finally:
437            _state._tls.current_app = current
438
439    def test_current_task(self):
440        @self.app.task
441        def foo(shared=False):
442            pass
443
444        _state._task_stack.push(foo)
445        try:
446            assert self.app.current_task.name == foo.name
447        finally:
448            _state._task_stack.pop()
449
450    def test_task_not_shared(self):
451        with patch('celery.app.base.connect_on_app_finalize') as sh:
452            @self.app.task(shared=False)
453            def foo():
454                pass
455            sh.assert_not_called()
456
457    def test_task_compat_with_filter(self):
458        with self.Celery() as app:
459            check = Mock()
460
461            def filter(task):
462                check(task)
463                return task
464
465            @app.task(filter=filter, shared=False)
466            def foo():
467                pass
468            check.assert_called_with(foo)
469
470    def test_task_with_filter(self):
471        with self.Celery() as app:
472            check = Mock()
473
474            def filter(task):
475                check(task)
476                return task
477
478            assert not _appbase.USING_EXECV
479
480            @app.task(filter=filter, shared=False)
481            def foo():
482                pass
483            check.assert_called_with(foo)
484
485    def test_task_sets_main_name_MP_MAIN_FILE(self):
486        from celery.utils import imports as _imports
487        _imports.MP_MAIN_FILE = __file__
488        try:
489            with self.Celery('xuzzy') as app:
490
491                @app.task
492                def foo():
493                    pass
494
495                assert foo.name == 'xuzzy.foo'
496        finally:
497            _imports.MP_MAIN_FILE = None
498
499    def test_annotate_decorator(self):
500        from celery.app.task import Task
501
502        class adX(Task):
503
504            def run(self, y, z, x):
505                return y, z, x
506
507        check = Mock()
508
509        def deco(fun):
510
511            def _inner(*args, **kwargs):
512                check(*args, **kwargs)
513                return fun(*args, **kwargs)
514            return _inner
515
516        self.app.conf.task_annotations = {
517            adX.name: {'@__call__': deco}
518        }
519        adX.bind(self.app)
520        assert adX.app is self.app
521
522        i = adX()
523        i(2, 4, x=3)
524        check.assert_called_with(i, 2, 4, x=3)
525
526        i.annotate()
527        i.annotate()
528
529    def test_apply_async_adds_children(self):
530        from celery._state import _task_stack
531
532        @self.app.task(bind=True, shared=False)
533        def a3cX1(self):
534            pass
535
536        @self.app.task(bind=True, shared=False)
537        def a3cX2(self):
538            pass
539
540        _task_stack.push(a3cX1)
541        try:
542            a3cX1.push_request(called_directly=False)
543            try:
544                res = a3cX2.apply_async(add_to_parent=True)
545                assert res in a3cX1.request.children
546            finally:
547                a3cX1.pop_request()
548        finally:
549            _task_stack.pop()
550
551    def test_pickle_app(self):
552        changes = {'THE_FOO_BAR': 'bars',
553                   'THE_MII_MAR': 'jars'}
554        self.app.conf.update(changes)
555        saved = pickle.dumps(self.app)
556        assert len(saved) < 2048
557        restored = pickle.loads(saved)
558        for key, value in items(changes):
559            assert restored.conf[key] == value
560
561    def test_worker_main(self):
562        from celery.bin import worker as worker_bin
563
564        class worker(worker_bin.worker):
565
566            def execute_from_commandline(self, argv):
567                return argv
568
569        prev, worker_bin.worker = worker_bin.worker, worker
570        try:
571            ret = self.app.worker_main(argv=['--version'])
572            assert ret == ['--version']
573        finally:
574            worker_bin.worker = prev
575
576    def test_config_from_envvar(self):
577        os.environ['CELERYTEST_CONFIG_OBJECT'] = 't.unit.app.test_app'
578        self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
579        assert self.app.conf.THIS_IS_A_KEY == 'this is a value'
580
581    def assert_config2(self):
582        assert self.app.conf.LEAVE_FOR_WORK
583        assert self.app.conf.MOMENT_TO_STOP
584        assert self.app.conf.CALL_ME_BACK == 123456789
585        assert not self.app.conf.WANT_ME_TO
586        assert self.app.conf.UNDERSTAND_ME
587
588    def test_config_from_object__lazy(self):
589        conf = ObjectConfig2()
590        self.app.config_from_object(conf)
591        assert self.app.loader._conf is unconfigured
592        assert self.app._config_source is conf
593
594        self.assert_config2()
595
596    def test_config_from_object__force(self):
597        self.app.config_from_object(ObjectConfig2(), force=True)
598        assert self.app.loader._conf
599
600        self.assert_config2()
601
602    def test_config_from_object__compat(self):
603
604        class Config(object):
605            CELERY_ALWAYS_EAGER = 44
606            CELERY_DEFAULT_DELIVERY_MODE = 30
607            CELERY_TASK_PUBLISH_RETRY = False
608
609        self.app.config_from_object(Config)
610        assert self.app.conf.task_always_eager == 44
611        assert self.app.conf.CELERY_ALWAYS_EAGER == 44
612        assert not self.app.conf.task_publish_retry
613        assert self.app.conf.task_default_routing_key == 'testcelery'
614
615    def test_config_from_object__supports_old_names(self):
616
617        class Config(object):
618            task_always_eager = 45
619            task_default_delivery_mode = 301
620
621        self.app.config_from_object(Config())
622        assert self.app.conf.CELERY_ALWAYS_EAGER == 45
623        assert self.app.conf.task_always_eager == 45
624        assert self.app.conf.CELERY_DEFAULT_DELIVERY_MODE == 301
625        assert self.app.conf.task_default_delivery_mode == 301
626        assert self.app.conf.task_default_routing_key == 'testcelery'
627
628    def test_config_from_object__namespace_uppercase(self):
629
630        class Config(object):
631            CELERY_TASK_ALWAYS_EAGER = 44
632            CELERY_TASK_DEFAULT_DELIVERY_MODE = 301
633
634        self.app.config_from_object(Config(), namespace='CELERY')
635        assert self.app.conf.task_always_eager == 44
636
637    def test_config_from_object__namespace_lowercase(self):
638
639        class Config(object):
640            celery_task_always_eager = 44
641            celery_task_default_delivery_mode = 301
642
643        self.app.config_from_object(Config(), namespace='celery')
644        assert self.app.conf.task_always_eager == 44
645
646    def test_config_from_object__mixing_new_and_old(self):
647
648        class Config(object):
649            task_always_eager = 44
650            worker_agent = 'foo:Agent'
651            worker_consumer = 'foo:Consumer'
652            beat_schedule = '/foo/schedule'
653            CELERY_DEFAULT_DELIVERY_MODE = 301
654
655        with pytest.raises(ImproperlyConfigured) as exc:
656            self.app.config_from_object(Config(), force=True)
657            assert exc.args[0].startswith('CELERY_DEFAULT_DELIVERY_MODE')
658            assert 'task_default_delivery_mode' in exc.args[0]
659
660    def test_config_from_object__mixing_old_and_new(self):
661
662        class Config(object):
663            CELERY_ALWAYS_EAGER = 46
664            CELERYD_AGENT = 'foo:Agent'
665            CELERYD_CONSUMER = 'foo:Consumer'
666            CELERYBEAT_SCHEDULE = '/foo/schedule'
667            task_default_delivery_mode = 301
668
669        with pytest.raises(ImproperlyConfigured) as exc:
670            self.app.config_from_object(Config(), force=True)
671            assert exc.args[0].startswith('task_default_delivery_mode')
672            assert 'CELERY_DEFAULT_DELIVERY_MODE' in exc.args[0]
673
674    def test_config_from_cmdline(self):
675        cmdline = ['task_always_eager=no',
676                   'result_backend=/dev/null',
677                   'worker_prefetch_multiplier=368',
678                   '.foobarstring=(string)300',
679                   '.foobarint=(int)300',
680                   'database_engine_options=(dict){"foo": "bar"}']
681        self.app.config_from_cmdline(cmdline, namespace='worker')
682        assert not self.app.conf.task_always_eager
683        assert self.app.conf.result_backend == '/dev/null'
684        assert self.app.conf.worker_prefetch_multiplier == 368
685        assert self.app.conf.worker_foobarstring == '300'
686        assert self.app.conf.worker_foobarint == 300
687        assert self.app.conf.database_engine_options == {'foo': 'bar'}
688
689    def test_setting__broker_transport_options(self):
690
691        _args = {'foo': 'bar', 'spam': 'baz'}
692
693        self.app.config_from_object(Bunch())
694        assert self.app.conf.broker_transport_options == \
695            {'polling_interval': 0.1}
696
697        self.app.config_from_object(Bunch(broker_transport_options=_args))
698        assert self.app.conf.broker_transport_options == _args
699
700    def test_Windows_log_color_disabled(self):
701        self.app.IS_WINDOWS = True
702        assert not self.app.log.supports_color(True)
703
704    def test_WorkController(self):
705        x = self.app.WorkController
706        assert x.app is self.app
707
708    def test_Worker(self):
709        x = self.app.Worker
710        assert x.app is self.app
711
712    @pytest.mark.usefixtures('depends_on_current_app')
713    def test_AsyncResult(self):
714        x = self.app.AsyncResult('1')
715        assert x.app is self.app
716        r = loads(dumps(x))
717        # not set as current, so ends up as default app after reduce
718        assert r.app is current_app._get_current_object()
719
720    def test_get_active_apps(self):
721        assert list(_state._get_active_apps())
722
723        app1 = self.Celery()
724        appid = id(app1)
725        assert app1 in _state._get_active_apps()
726        app1.close()
727        del(app1)
728
729        gc.collect()
730
731        # weakref removed from list when app goes out of scope.
732        with pytest.raises(StopIteration):
733            next(app for app in _state._get_active_apps() if id(app) == appid)
734
735    def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
736        assert not self.app.config_from_envvar(
737            'HDSAJIHWIQHEWQU', force=True, silent=True)
738        with pytest.raises(ImproperlyConfigured):
739            self.app.config_from_envvar(
740                'HDSAJIHWIQHEWQU', force=True, silent=False,
741            )
742        os.environ[key] = __name__ + '.object_config'
743        assert self.app.config_from_envvar(key, force=True)
744        assert self.app.conf['FOO'] == 1
745        assert self.app.conf['BAR'] == 2
746
747        os.environ[key] = 'unknown_asdwqe.asdwqewqe'
748        with pytest.raises(ImportError):
749            self.app.config_from_envvar(key, silent=False)
750        assert not self.app.config_from_envvar(key, force=True, silent=True)
751
752        os.environ[key] = __name__ + '.dict_config'
753        assert self.app.config_from_envvar(key, force=True)
754        assert self.app.conf['FOO'] == 10
755        assert self.app.conf['BAR'] == 20
756
757    @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
758    def test_start(self, execute):
759        self.app.start()
760        execute.assert_called()
761
762    @pytest.mark.parametrize('url,expected_fields', [
763        ('pyamqp://', {
764            'hostname': 'localhost',
765            'userid': 'guest',
766            'password': 'guest',
767            'virtual_host': '/',
768        }),
769        ('pyamqp://:1978/foo', {
770            'port': 1978,
771            'virtual_host': 'foo',
772        }),
773        ('pyamqp:////value', {
774            'virtual_host': '/value',
775        })
776    ])
777    def test_amqp_get_broker_info(self, url, expected_fields):
778        info = self.app.connection(url).info()
779        for key, expected_value in items(expected_fields):
780            assert info[key] == expected_value
781
782    def test_amqp_failover_strategy_selection(self):
783        # Test passing in a string and make sure the string
784        # gets there untouched
785        self.app.conf.broker_failover_strategy = 'foo-bar'
786        assert self.app.connection('amqp:////value') \
787                       .failover_strategy == 'foo-bar'
788
789        # Try passing in None
790        self.app.conf.broker_failover_strategy = None
791        assert self.app.connection('amqp:////value') \
792                       .failover_strategy == itertools.cycle
793
794        # Test passing in a method
795        def my_failover_strategy(it):
796            yield True
797
798        self.app.conf.broker_failover_strategy = my_failover_strategy
799        assert self.app.connection('amqp:////value') \
800                       .failover_strategy == my_failover_strategy
801
802    def test_after_fork(self):
803        self.app._pool = Mock()
804        self.app.on_after_fork = Mock(name='on_after_fork')
805        self.app._after_fork()
806        assert self.app._pool is None
807        self.app.on_after_fork.send.assert_called_with(sender=self.app)
808        self.app._after_fork()
809
810    def test_global_after_fork(self):
811        self.app._after_fork = Mock(name='_after_fork')
812        _appbase._after_fork_cleanup_app(self.app)
813        self.app._after_fork.assert_called_with()
814
815    @patch('celery.app.base.logger')
816    def test_after_fork_cleanup_app__raises(self, logger):
817        self.app._after_fork = Mock(name='_after_fork')
818        exc = self.app._after_fork.side_effect = KeyError()
819        _appbase._after_fork_cleanup_app(self.app)
820        logger.info.assert_called_with(
821            'after forker raised exception: %r', exc, exc_info=1)
822
823    def test_ensure_after_fork__no_multiprocessing(self):
824        prev, _appbase.register_after_fork = (
825            _appbase.register_after_fork, None)
826        try:
827            self.app._after_fork_registered = False
828            self.app._ensure_after_fork()
829            assert self.app._after_fork_registered
830        finally:
831            _appbase.register_after_fork = prev
832
833    def test_canvas(self):
834        assert self.app._canvas.Signature
835
836    def test_signature(self):
837        sig = self.app.signature('foo', (1, 2))
838        assert sig.app is self.app
839
840    def test_timezone__none_set(self):
841        self.app.conf.timezone = None
842        self.app.conf.enable_utc = True
843        assert self.app.timezone == timezone.utc
844        del self.app.timezone
845        self.app.conf.enable_utc = False
846        assert self.app.timezone == timezone.local
847
848    def test_uses_utc_timezone(self):
849        self.app.conf.timezone = None
850        self.app.conf.enable_utc = True
851        assert self.app.uses_utc_timezone() is True
852
853        self.app.conf.enable_utc = False
854        del self.app.timezone
855        assert self.app.uses_utc_timezone() is False
856
857        self.app.conf.timezone = 'US/Eastern'
858        del self.app.timezone
859        assert self.app.uses_utc_timezone() is False
860
861        self.app.conf.timezone = 'UTC'
862        del self.app.timezone
863        assert self.app.uses_utc_timezone() is True
864
865    def test_compat_on_configure(self):
866        _on_configure = Mock(name='on_configure')
867
868        class CompatApp(Celery):
869
870            def on_configure(self, *args, **kwargs):
871                # on pypy3 if named on_configure the class function
872                # will be called, instead of the mock defined above,
873                # so we add the underscore.
874                _on_configure(*args, **kwargs)
875
876        with CompatApp(set_as_current=False) as app:
877            app.loader = Mock()
878            app.loader.conf = {}
879            app._load_config()
880            _on_configure.assert_called_with()
881
882    def test_add_periodic_task(self):
883
884        @self.app.task
885        def add(x, y):
886            pass
887        assert not self.app.configured
888        self.app.add_periodic_task(
889            10, self.app.signature('add', (2, 2)),
890            name='add1', expires=3,
891        )
892        assert self.app._pending_periodic_tasks
893        assert not self.app.configured
894
895        sig2 = add.s(4, 4)
896        assert self.app.configured
897        self.app.add_periodic_task(20, sig2, name='add2', expires=4)
898        assert 'add1' in self.app.conf.beat_schedule
899        assert 'add2' in self.app.conf.beat_schedule
900
901    def test_pool_no_multiprocessing(self):
902        with mock.mask_modules('multiprocessing.util'):
903            pool = self.app.pool
904            assert pool is self.app._pool
905
906    def test_bugreport(self):
907        assert self.app.bugreport()
908
909    def test_send_task__connection_provided(self):
910        connection = Mock(name='connection')
911        router = Mock(name='router')
912        router.route.return_value = {}
913        self.app.amqp = Mock(name='amqp')
914        self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')
915        self.app.send_task('foo', (1, 2), connection=connection, router=router)
916        self.app.amqp.Producer.assert_called_with(
917            connection, auto_declare=False)
918        self.app.amqp.send_task_message.assert_called_with(
919            self.app.amqp.Producer(), 'foo',
920            self.app.amqp.create_task_message())
921
922    def test_send_task_sent_event(self):
923
924        class Dispatcher(object):
925            sent = []
926
927            def publish(self, type, fields, *args, **kwargs):
928                self.sent.append((type, fields))
929
930        conn = self.app.connection()
931        chan = conn.channel()
932        try:
933            for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
934                chan.exchange_declare(e, 'direct', durable=True)
935                chan.queue_declare(e, durable=True)
936                chan.queue_bind(e, e, e)
937        finally:
938            chan.close()
939        assert conn.transport_cls == 'memory'
940
941        message = self.app.amqp.create_task_message(
942            'id', 'footask', (), {}, create_sent_event=True,
943        )
944
945        prod = self.app.amqp.Producer(conn)
946        dispatcher = Dispatcher()
947        self.app.amqp.send_task_message(
948            prod, 'footask', message,
949            exchange='moo_exchange', routing_key='moo_exchange',
950            event_dispatcher=dispatcher,
951        )
952        assert dispatcher.sent
953        assert dispatcher.sent[0][0] == 'task-sent'
954        self.app.amqp.send_task_message(
955            prod, 'footask', message, event_dispatcher=dispatcher,
956            exchange='bar_exchange', routing_key='bar_exchange',
957        )
958
959    def test_select_queues(self):
960        self.app.amqp = Mock(name='amqp')
961        self.app.select_queues({'foo', 'bar'})
962        self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})
963
964    def test_Beat(self):
965        from celery.apps.beat import Beat
966        beat = self.app.Beat()
967        assert isinstance(beat, Beat)
968
969    def test_registry_cls(self):
970
971        class TaskRegistry(self.app.registry_cls):
972            pass
973
974        class CustomCelery(type(self.app)):
975            registry_cls = TaskRegistry
976
977        app = CustomCelery(set_as_current=False)
978        assert isinstance(app.tasks, TaskRegistry)
979
980
981class test_defaults:
982
983    def test_strtobool(self):
984        for s in ('false', 'no', '0'):
985            assert not defaults.strtobool(s)
986        for s in ('true', 'yes', '1'):
987            assert defaults.strtobool(s)
988        with pytest.raises(TypeError):
989            defaults.strtobool('unsure')
990
991
992class test_debugging_utils:
993
994    def test_enable_disable_trace(self):
995        try:
996            _app.enable_trace()
997            assert _state.app_or_default == _state._app_or_default_trace
998            _app.disable_trace()
999            assert _state.app_or_default == _state._app_or_default
1000        finally:
1001            _app.disable_trace()
1002
1003
1004class test_pyimplementation:
1005
1006    def test_platform_python_implementation(self):
1007        with mock.platform_pyimp(lambda: 'Xython'):
1008            assert pyimplementation() == 'Xython'
1009
1010    def test_platform_jython(self):
1011        with mock.platform_pyimp():
1012            with mock.sys_platform('java 1.6.51'):
1013                assert 'Jython' in pyimplementation()
1014
1015    def test_platform_pypy(self):
1016        with mock.platform_pyimp():
1017            with mock.sys_platform('darwin'):
1018                with mock.pypy_version((1, 4, 3)):
1019                    assert 'PyPy' in pyimplementation()
1020                with mock.pypy_version((1, 4, 3, 'a4')):
1021                    assert 'PyPy' in pyimplementation()
1022
1023    def test_platform_fallback(self):
1024        with mock.platform_pyimp():
1025            with mock.sys_platform('darwin'):
1026                with mock.pypy_version():
1027                    assert 'CPython' == pyimplementation()
1028
1029
1030class test_shared_task:
1031
1032    def test_registers_to_all_apps(self):
1033        with self.Celery('xproj', set_as_current=True) as xproj:
1034            xproj.finalize()
1035
1036            @shared_task
1037            def foo():
1038                return 42
1039
1040            @shared_task()
1041            def bar():
1042                return 84
1043
1044            assert foo.app is xproj
1045            assert bar.app is xproj
1046            assert foo._get_current_object()
1047
1048            with self.Celery('yproj', set_as_current=True) as yproj:
1049                assert foo.app is yproj
1050                assert bar.app is yproj
1051
1052                @shared_task()
1053                def baz():
1054                    return 168
1055
1056                assert baz.app is yproj
1057