1from __future__ import absolute_import, unicode_literals 2 3import gc 4import itertools 5import os 6import ssl 7from copy import deepcopy 8from datetime import datetime, timedelta 9from pickle import dumps, loads 10 11import pytest 12from case import ContextMock, Mock, mock, patch 13from vine import promise 14 15from celery import Celery, _state 16from celery import app as _app 17from celery import current_app, shared_task 18from celery.app import base as _appbase 19from celery.app import defaults 20from celery.exceptions import ImproperlyConfigured 21from celery.five import items, keys 22from celery.loaders.base import unconfigured 23from celery.platforms import pyimplementation 24from celery.utils.collections import DictAttribute 25from celery.utils.objects import Bunch 26from celery.utils.serialization import pickle 27from celery.utils.time import localize, timezone, to_utc 28 29THIS_IS_A_KEY = 'this is a value' 30 31 32class ObjectConfig(object): 33 FOO = 1 34 BAR = 2 35 36 37object_config = ObjectConfig() 38dict_config = {'FOO': 10, 'BAR': 20} 39 40 41class ObjectConfig2(object): 42 LEAVE_FOR_WORK = True 43 MOMENT_TO_STOP = True 44 CALL_ME_BACK = 123456789 45 WANT_ME_TO = False 46 UNDERSTAND_ME = True 47 48 49class test_module: 50 51 def test_default_app(self): 52 assert _app.default_app == _state.default_app 53 54 def test_bugreport(self, app): 55 assert _app.bugreport(app=app) 56 57 58class test_task_join_will_block: 59 60 def test_task_join_will_block(self, patching): 61 patching('celery._state._task_join_will_block', 0) 62 assert _state._task_join_will_block == 0 63 _state._set_task_join_will_block(True) 64 assert _state._task_join_will_block is True 65 # fixture 'app' sets this, so need to use orig_ function 66 # set there by that fixture. 67 res = _state.orig_task_join_will_block() 68 assert res is True 69 70 71class test_App: 72 73 def setup(self): 74 self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG)) 75 76 def test_now(self): 77 timezone_setting_value = 'US/Eastern' 78 tz_utc = timezone.get_timezone('UTC') 79 tz_us_eastern = timezone.get_timezone(timezone_setting_value) 80 81 now = to_utc(datetime.utcnow()) 82 app_now = self.app.now() 83 84 assert app_now.tzinfo is tz_utc 85 assert app_now - now <= timedelta(seconds=1) 86 87 # Check that timezone conversion is applied from configuration 88 self.app.conf.enable_utc = False 89 self.app.conf.timezone = timezone_setting_value 90 # timezone is a cached property 91 del self.app.timezone 92 93 app_now = self.app.now() 94 95 assert app_now.tzinfo.zone == tz_us_eastern.zone 96 97 diff = to_utc(datetime.utcnow()) - localize(app_now, tz_utc) 98 assert diff <= timedelta(seconds=1) 99 100 # Verify that timezone setting overrides enable_utc=on setting 101 self.app.conf.enable_utc = True 102 del self.app.timezone 103 app_now = self.app.now() 104 assert self.app.timezone == tz_us_eastern 105 assert app_now.tzinfo.zone == tz_us_eastern.zone 106 107 @patch('celery.app.base.set_default_app') 108 def test_set_default(self, set_default_app): 109 self.app.set_default() 110 set_default_app.assert_called_with(self.app) 111 112 @patch('celery.security.setup_security') 113 def test_setup_security(self, setup_security): 114 self.app.setup_security( 115 {'json'}, 'key', 'cert', 'store', 'digest', 'serializer') 116 setup_security.assert_called_with( 117 {'json'}, 'key', 'cert', 'store', 'digest', 'serializer', 118 app=self.app) 119 120 def test_task_autofinalize_disabled(self): 121 with self.Celery('xyzibari', autofinalize=False) as app: 122 @app.task 123 def ttafd(): 124 return 42 125 126 with pytest.raises(RuntimeError): 127 ttafd() 128 129 with self.Celery('xyzibari', autofinalize=False) as app: 130 @app.task 131 def ttafd2(): 132 return 42 133 134 app.finalize() 135 assert ttafd2() == 42 136 137 def test_registry_autofinalize_disabled(self): 138 with self.Celery('xyzibari', autofinalize=False) as app: 139 with pytest.raises(RuntimeError): 140 app.tasks['celery.chain'] 141 app.finalize() 142 assert app.tasks['celery.chain'] 143 144 def test_task(self): 145 with self.Celery('foozibari') as app: 146 147 def fun(): 148 pass 149 150 fun.__module__ = '__main__' 151 task = app.task(fun) 152 assert task.name == app.main + '.fun' 153 154 def test_task_too_many_args(self): 155 with pytest.raises(TypeError): 156 self.app.task(Mock(name='fun'), True) 157 with pytest.raises(TypeError): 158 self.app.task(Mock(name='fun'), True, 1, 2) 159 160 def test_with_config_source(self): 161 with self.Celery(config_source=ObjectConfig) as app: 162 assert app.conf.FOO == 1 163 assert app.conf.BAR == 2 164 165 @pytest.mark.usefixtures('depends_on_current_app') 166 def test_task_windows_execv(self): 167 prev, _appbase.USING_EXECV = _appbase.USING_EXECV, True 168 try: 169 @self.app.task(shared=False) 170 def foo(): 171 pass 172 173 assert foo._get_current_object() # is proxy 174 175 finally: 176 _appbase.USING_EXECV = prev 177 assert not _appbase.USING_EXECV 178 179 def test_task_takes_no_args(self): 180 with pytest.raises(TypeError): 181 @self.app.task(1) 182 def foo(): 183 pass 184 185 def test_add_defaults(self): 186 assert not self.app.configured 187 _conf = {'foo': 300} 188 189 def conf(): 190 return _conf 191 192 self.app.add_defaults(conf) 193 assert conf in self.app._pending_defaults 194 assert not self.app.configured 195 assert self.app.conf.foo == 300 196 assert self.app.configured 197 assert not self.app._pending_defaults 198 199 # defaults not pickled 200 appr = loads(dumps(self.app)) 201 with pytest.raises(AttributeError): 202 appr.conf.foo 203 204 # add more defaults after configured 205 conf2 = {'foo': 'BAR'} 206 self.app.add_defaults(conf2) 207 assert self.app.conf.foo == 'BAR' 208 209 assert _conf in self.app.conf.defaults 210 assert conf2 in self.app.conf.defaults 211 212 def test_connection_or_acquire(self): 213 with self.app.connection_or_acquire(block=True): 214 assert self.app.pool._dirty 215 216 with self.app.connection_or_acquire(pool=False): 217 assert not self.app.pool._dirty 218 219 def test_using_v1_reduce(self): 220 self.app._using_v1_reduce = True 221 assert loads(dumps(self.app)) 222 223 def test_autodiscover_tasks_force(self): 224 self.app.loader.autodiscover_tasks = Mock() 225 self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True) 226 self.app.loader.autodiscover_tasks.assert_called_with( 227 ['proj.A', 'proj.B'], 'tasks', 228 ) 229 self.app.loader.autodiscover_tasks = Mock() 230 231 def lazy_list(): 232 return ['proj.A', 'proj.B'] 233 self.app.autodiscover_tasks( 234 lazy_list, 235 related_name='george', 236 force=True, 237 ) 238 self.app.loader.autodiscover_tasks.assert_called_with( 239 ['proj.A', 'proj.B'], 'george', 240 ) 241 242 def test_autodiscover_tasks_lazy(self): 243 with patch('celery.signals.import_modules') as import_modules: 244 def lazy_list(): 245 return [1, 2, 3] 246 self.app.autodiscover_tasks(lazy_list) 247 import_modules.connect.assert_called() 248 prom = import_modules.connect.call_args[0][0] 249 assert isinstance(prom, promise) 250 assert prom.fun == self.app._autodiscover_tasks 251 assert prom.args[0](), [1, 2 == 3] 252 253 def test_autodiscover_tasks__no_packages(self): 254 fixup1 = Mock(name='fixup') 255 fixup2 = Mock(name='fixup') 256 self.app._autodiscover_tasks_from_names = Mock(name='auto') 257 self.app._fixups = [fixup1, fixup2] 258 fixup1.autodiscover_tasks.return_value = ['A', 'B', 'C'] 259 fixup2.autodiscover_tasks.return_value = ['D', 'E', 'F'] 260 self.app.autodiscover_tasks(force=True) 261 self.app._autodiscover_tasks_from_names.assert_called_with( 262 ['A', 'B', 'C', 'D', 'E', 'F'], related_name='tasks', 263 ) 264 265 def test_with_broker(self, patching): 266 patching.setenv('CELERY_BROKER_URL', '') 267 with self.Celery(broker='foo://baribaz') as app: 268 assert app.conf.broker_url == 'foo://baribaz' 269 270 def test_pending_configuration__setattr(self): 271 with self.Celery(broker='foo://bar') as app: 272 app.conf.task_default_delivery_mode = 44 273 app.conf.worker_agent = 'foo:Bar' 274 assert not app.configured 275 assert app.conf.worker_agent == 'foo:Bar' 276 assert app.conf.broker_url == 'foo://bar' 277 assert app._preconf['worker_agent'] == 'foo:Bar' 278 279 assert app.configured 280 reapp = pickle.loads(pickle.dumps(app)) 281 assert reapp._preconf['worker_agent'] == 'foo:Bar' 282 assert not reapp.configured 283 assert reapp.conf.worker_agent == 'foo:Bar' 284 assert reapp.configured 285 assert reapp.conf.broker_url == 'foo://bar' 286 assert reapp._preconf['worker_agent'] == 'foo:Bar' 287 288 def test_pending_configuration__update(self): 289 with self.Celery(broker='foo://bar') as app: 290 app.conf.update( 291 task_default_delivery_mode=44, 292 worker_agent='foo:Bar', 293 ) 294 assert not app.configured 295 assert app.conf.worker_agent == 'foo:Bar' 296 assert app.conf.broker_url == 'foo://bar' 297 assert app._preconf['worker_agent'] == 'foo:Bar' 298 299 def test_pending_configuration__compat_settings(self): 300 with self.Celery(broker='foo://bar', backend='foo') as app: 301 app.conf.update( 302 CELERY_ALWAYS_EAGER=4, 303 CELERY_DEFAULT_DELIVERY_MODE=63, 304 CELERYD_AGENT='foo:Barz', 305 ) 306 assert app.conf.task_always_eager == 4 307 assert app.conf.task_default_delivery_mode == 63 308 assert app.conf.worker_agent == 'foo:Barz' 309 assert app.conf.broker_url == 'foo://bar' 310 assert app.conf.result_backend == 'foo' 311 312 def test_pending_configuration__compat_settings_mixing(self): 313 with self.Celery(broker='foo://bar', backend='foo') as app: 314 app.conf.update( 315 CELERY_ALWAYS_EAGER=4, 316 CELERY_DEFAULT_DELIVERY_MODE=63, 317 CELERYD_AGENT='foo:Barz', 318 worker_consumer='foo:Fooz', 319 ) 320 with pytest.raises(ImproperlyConfigured): 321 assert app.conf.task_always_eager == 4 322 323 def test_pending_configuration__django_settings(self): 324 with self.Celery(broker='foo://bar', backend='foo') as app: 325 app.config_from_object(DictAttribute(Bunch( 326 CELERY_TASK_ALWAYS_EAGER=4, 327 CELERY_TASK_DEFAULT_DELIVERY_MODE=63, 328 CELERY_WORKER_AGENT='foo:Barz', 329 CELERY_RESULT_SERIALIZER='pickle', 330 )), namespace='CELERY') 331 assert app.conf.result_serializer == 'pickle' 332 assert app.conf.CELERY_RESULT_SERIALIZER == 'pickle' 333 assert app.conf.task_always_eager == 4 334 assert app.conf.task_default_delivery_mode == 63 335 assert app.conf.worker_agent == 'foo:Barz' 336 assert app.conf.broker_url == 'foo://bar' 337 assert app.conf.result_backend == 'foo' 338 339 def test_pending_configuration__compat_settings_mixing_new(self): 340 with self.Celery(broker='foo://bar', backend='foo') as app: 341 app.conf.update( 342 task_always_eager=4, 343 task_default_delivery_mode=63, 344 worker_agent='foo:Barz', 345 CELERYD_CONSUMER='foo:Fooz', 346 CELERYD_AUTOSCALER='foo:Xuzzy', 347 ) 348 with pytest.raises(ImproperlyConfigured): 349 assert app.conf.worker_consumer == 'foo:Fooz' 350 351 def test_pending_configuration__compat_settings_mixing_alt(self): 352 with self.Celery(broker='foo://bar', backend='foo') as app: 353 app.conf.update( 354 task_always_eager=4, 355 task_default_delivery_mode=63, 356 worker_agent='foo:Barz', 357 CELERYD_CONSUMER='foo:Fooz', 358 worker_consumer='foo:Fooz', 359 CELERYD_AUTOSCALER='foo:Xuzzy', 360 worker_autoscaler='foo:Xuzzy' 361 ) 362 363 def test_pending_configuration__setdefault(self): 364 with self.Celery(broker='foo://bar') as app: 365 assert not app.configured 366 app.conf.setdefault('worker_agent', 'foo:Bar') 367 assert not app.configured 368 369 def test_pending_configuration__iter(self): 370 with self.Celery(broker='foo://bar') as app: 371 app.conf.worker_agent = 'foo:Bar' 372 assert not app.configured 373 assert list(keys(app.conf)) 374 assert app.configured 375 assert 'worker_agent' in app.conf 376 assert dict(app.conf) 377 378 def test_pending_configuration__raises_ImproperlyConfigured(self): 379 with self.Celery(set_as_current=False) as app: 380 app.conf.worker_agent = 'foo://bar' 381 app.conf.task_default_delivery_mode = 44 382 app.conf.CELERY_ALWAYS_EAGER = 5 383 with pytest.raises(ImproperlyConfigured): 384 app.finalize() 385 386 with self.Celery() as app: 387 assert not self.app.conf.task_always_eager 388 389 def test_pending_configuration__ssl_settings(self): 390 with self.Celery(broker='foo://bar', 391 broker_use_ssl={ 392 'ssl_cert_reqs': ssl.CERT_REQUIRED, 393 'ssl_ca_certs': '/path/to/ca.crt', 394 'ssl_certfile': '/path/to/client.crt', 395 'ssl_keyfile': '/path/to/client.key'}, 396 redis_backend_use_ssl={ 397 'ssl_cert_reqs': ssl.CERT_REQUIRED, 398 'ssl_ca_certs': '/path/to/ca.crt', 399 'ssl_certfile': '/path/to/client.crt', 400 'ssl_keyfile': '/path/to/client.key'}) as app: 401 assert not app.configured 402 assert app.conf.broker_url == 'foo://bar' 403 assert app.conf.broker_use_ssl['ssl_certfile'] == \ 404 '/path/to/client.crt' 405 assert app.conf.broker_use_ssl['ssl_keyfile'] == \ 406 '/path/to/client.key' 407 assert app.conf.broker_use_ssl['ssl_ca_certs'] == \ 408 '/path/to/ca.crt' 409 assert app.conf.broker_use_ssl['ssl_cert_reqs'] == \ 410 ssl.CERT_REQUIRED 411 assert app.conf.redis_backend_use_ssl['ssl_certfile'] == \ 412 '/path/to/client.crt' 413 assert app.conf.redis_backend_use_ssl['ssl_keyfile'] == \ 414 '/path/to/client.key' 415 assert app.conf.redis_backend_use_ssl['ssl_ca_certs'] == \ 416 '/path/to/ca.crt' 417 assert app.conf.redis_backend_use_ssl['ssl_cert_reqs'] == \ 418 ssl.CERT_REQUIRED 419 420 def test_repr(self): 421 assert repr(self.app) 422 423 def test_custom_task_registry(self): 424 with self.Celery(tasks=self.app.tasks) as app2: 425 assert app2.tasks is self.app.tasks 426 427 def test_include_argument(self): 428 with self.Celery(include=('foo', 'bar.foo')) as app: 429 assert app.conf.include, ('foo' == 'bar.foo') 430 431 def test_set_as_current(self): 432 current = _state._tls.current_app 433 try: 434 app = self.Celery(set_as_current=True) 435 assert _state._tls.current_app is app 436 finally: 437 _state._tls.current_app = current 438 439 def test_current_task(self): 440 @self.app.task 441 def foo(shared=False): 442 pass 443 444 _state._task_stack.push(foo) 445 try: 446 assert self.app.current_task.name == foo.name 447 finally: 448 _state._task_stack.pop() 449 450 def test_task_not_shared(self): 451 with patch('celery.app.base.connect_on_app_finalize') as sh: 452 @self.app.task(shared=False) 453 def foo(): 454 pass 455 sh.assert_not_called() 456 457 def test_task_compat_with_filter(self): 458 with self.Celery() as app: 459 check = Mock() 460 461 def filter(task): 462 check(task) 463 return task 464 465 @app.task(filter=filter, shared=False) 466 def foo(): 467 pass 468 check.assert_called_with(foo) 469 470 def test_task_with_filter(self): 471 with self.Celery() as app: 472 check = Mock() 473 474 def filter(task): 475 check(task) 476 return task 477 478 assert not _appbase.USING_EXECV 479 480 @app.task(filter=filter, shared=False) 481 def foo(): 482 pass 483 check.assert_called_with(foo) 484 485 def test_task_sets_main_name_MP_MAIN_FILE(self): 486 from celery.utils import imports as _imports 487 _imports.MP_MAIN_FILE = __file__ 488 try: 489 with self.Celery('xuzzy') as app: 490 491 @app.task 492 def foo(): 493 pass 494 495 assert foo.name == 'xuzzy.foo' 496 finally: 497 _imports.MP_MAIN_FILE = None 498 499 def test_annotate_decorator(self): 500 from celery.app.task import Task 501 502 class adX(Task): 503 504 def run(self, y, z, x): 505 return y, z, x 506 507 check = Mock() 508 509 def deco(fun): 510 511 def _inner(*args, **kwargs): 512 check(*args, **kwargs) 513 return fun(*args, **kwargs) 514 return _inner 515 516 self.app.conf.task_annotations = { 517 adX.name: {'@__call__': deco} 518 } 519 adX.bind(self.app) 520 assert adX.app is self.app 521 522 i = adX() 523 i(2, 4, x=3) 524 check.assert_called_with(i, 2, 4, x=3) 525 526 i.annotate() 527 i.annotate() 528 529 def test_apply_async_adds_children(self): 530 from celery._state import _task_stack 531 532 @self.app.task(bind=True, shared=False) 533 def a3cX1(self): 534 pass 535 536 @self.app.task(bind=True, shared=False) 537 def a3cX2(self): 538 pass 539 540 _task_stack.push(a3cX1) 541 try: 542 a3cX1.push_request(called_directly=False) 543 try: 544 res = a3cX2.apply_async(add_to_parent=True) 545 assert res in a3cX1.request.children 546 finally: 547 a3cX1.pop_request() 548 finally: 549 _task_stack.pop() 550 551 def test_pickle_app(self): 552 changes = {'THE_FOO_BAR': 'bars', 553 'THE_MII_MAR': 'jars'} 554 self.app.conf.update(changes) 555 saved = pickle.dumps(self.app) 556 assert len(saved) < 2048 557 restored = pickle.loads(saved) 558 for key, value in items(changes): 559 assert restored.conf[key] == value 560 561 def test_worker_main(self): 562 from celery.bin import worker as worker_bin 563 564 class worker(worker_bin.worker): 565 566 def execute_from_commandline(self, argv): 567 return argv 568 569 prev, worker_bin.worker = worker_bin.worker, worker 570 try: 571 ret = self.app.worker_main(argv=['--version']) 572 assert ret == ['--version'] 573 finally: 574 worker_bin.worker = prev 575 576 def test_config_from_envvar(self): 577 os.environ['CELERYTEST_CONFIG_OBJECT'] = 't.unit.app.test_app' 578 self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT') 579 assert self.app.conf.THIS_IS_A_KEY == 'this is a value' 580 581 def assert_config2(self): 582 assert self.app.conf.LEAVE_FOR_WORK 583 assert self.app.conf.MOMENT_TO_STOP 584 assert self.app.conf.CALL_ME_BACK == 123456789 585 assert not self.app.conf.WANT_ME_TO 586 assert self.app.conf.UNDERSTAND_ME 587 588 def test_config_from_object__lazy(self): 589 conf = ObjectConfig2() 590 self.app.config_from_object(conf) 591 assert self.app.loader._conf is unconfigured 592 assert self.app._config_source is conf 593 594 self.assert_config2() 595 596 def test_config_from_object__force(self): 597 self.app.config_from_object(ObjectConfig2(), force=True) 598 assert self.app.loader._conf 599 600 self.assert_config2() 601 602 def test_config_from_object__compat(self): 603 604 class Config(object): 605 CELERY_ALWAYS_EAGER = 44 606 CELERY_DEFAULT_DELIVERY_MODE = 30 607 CELERY_TASK_PUBLISH_RETRY = False 608 609 self.app.config_from_object(Config) 610 assert self.app.conf.task_always_eager == 44 611 assert self.app.conf.CELERY_ALWAYS_EAGER == 44 612 assert not self.app.conf.task_publish_retry 613 assert self.app.conf.task_default_routing_key == 'testcelery' 614 615 def test_config_from_object__supports_old_names(self): 616 617 class Config(object): 618 task_always_eager = 45 619 task_default_delivery_mode = 301 620 621 self.app.config_from_object(Config()) 622 assert self.app.conf.CELERY_ALWAYS_EAGER == 45 623 assert self.app.conf.task_always_eager == 45 624 assert self.app.conf.CELERY_DEFAULT_DELIVERY_MODE == 301 625 assert self.app.conf.task_default_delivery_mode == 301 626 assert self.app.conf.task_default_routing_key == 'testcelery' 627 628 def test_config_from_object__namespace_uppercase(self): 629 630 class Config(object): 631 CELERY_TASK_ALWAYS_EAGER = 44 632 CELERY_TASK_DEFAULT_DELIVERY_MODE = 301 633 634 self.app.config_from_object(Config(), namespace='CELERY') 635 assert self.app.conf.task_always_eager == 44 636 637 def test_config_from_object__namespace_lowercase(self): 638 639 class Config(object): 640 celery_task_always_eager = 44 641 celery_task_default_delivery_mode = 301 642 643 self.app.config_from_object(Config(), namespace='celery') 644 assert self.app.conf.task_always_eager == 44 645 646 def test_config_from_object__mixing_new_and_old(self): 647 648 class Config(object): 649 task_always_eager = 44 650 worker_agent = 'foo:Agent' 651 worker_consumer = 'foo:Consumer' 652 beat_schedule = '/foo/schedule' 653 CELERY_DEFAULT_DELIVERY_MODE = 301 654 655 with pytest.raises(ImproperlyConfigured) as exc: 656 self.app.config_from_object(Config(), force=True) 657 assert exc.args[0].startswith('CELERY_DEFAULT_DELIVERY_MODE') 658 assert 'task_default_delivery_mode' in exc.args[0] 659 660 def test_config_from_object__mixing_old_and_new(self): 661 662 class Config(object): 663 CELERY_ALWAYS_EAGER = 46 664 CELERYD_AGENT = 'foo:Agent' 665 CELERYD_CONSUMER = 'foo:Consumer' 666 CELERYBEAT_SCHEDULE = '/foo/schedule' 667 task_default_delivery_mode = 301 668 669 with pytest.raises(ImproperlyConfigured) as exc: 670 self.app.config_from_object(Config(), force=True) 671 assert exc.args[0].startswith('task_default_delivery_mode') 672 assert 'CELERY_DEFAULT_DELIVERY_MODE' in exc.args[0] 673 674 def test_config_from_cmdline(self): 675 cmdline = ['task_always_eager=no', 676 'result_backend=/dev/null', 677 'worker_prefetch_multiplier=368', 678 '.foobarstring=(string)300', 679 '.foobarint=(int)300', 680 'database_engine_options=(dict){"foo": "bar"}'] 681 self.app.config_from_cmdline(cmdline, namespace='worker') 682 assert not self.app.conf.task_always_eager 683 assert self.app.conf.result_backend == '/dev/null' 684 assert self.app.conf.worker_prefetch_multiplier == 368 685 assert self.app.conf.worker_foobarstring == '300' 686 assert self.app.conf.worker_foobarint == 300 687 assert self.app.conf.database_engine_options == {'foo': 'bar'} 688 689 def test_setting__broker_transport_options(self): 690 691 _args = {'foo': 'bar', 'spam': 'baz'} 692 693 self.app.config_from_object(Bunch()) 694 assert self.app.conf.broker_transport_options == \ 695 {'polling_interval': 0.1} 696 697 self.app.config_from_object(Bunch(broker_transport_options=_args)) 698 assert self.app.conf.broker_transport_options == _args 699 700 def test_Windows_log_color_disabled(self): 701 self.app.IS_WINDOWS = True 702 assert not self.app.log.supports_color(True) 703 704 def test_WorkController(self): 705 x = self.app.WorkController 706 assert x.app is self.app 707 708 def test_Worker(self): 709 x = self.app.Worker 710 assert x.app is self.app 711 712 @pytest.mark.usefixtures('depends_on_current_app') 713 def test_AsyncResult(self): 714 x = self.app.AsyncResult('1') 715 assert x.app is self.app 716 r = loads(dumps(x)) 717 # not set as current, so ends up as default app after reduce 718 assert r.app is current_app._get_current_object() 719 720 def test_get_active_apps(self): 721 assert list(_state._get_active_apps()) 722 723 app1 = self.Celery() 724 appid = id(app1) 725 assert app1 in _state._get_active_apps() 726 app1.close() 727 del(app1) 728 729 gc.collect() 730 731 # weakref removed from list when app goes out of scope. 732 with pytest.raises(StopIteration): 733 next(app for app in _state._get_active_apps() if id(app) == appid) 734 735 def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'): 736 assert not self.app.config_from_envvar( 737 'HDSAJIHWIQHEWQU', force=True, silent=True) 738 with pytest.raises(ImproperlyConfigured): 739 self.app.config_from_envvar( 740 'HDSAJIHWIQHEWQU', force=True, silent=False, 741 ) 742 os.environ[key] = __name__ + '.object_config' 743 assert self.app.config_from_envvar(key, force=True) 744 assert self.app.conf['FOO'] == 1 745 assert self.app.conf['BAR'] == 2 746 747 os.environ[key] = 'unknown_asdwqe.asdwqewqe' 748 with pytest.raises(ImportError): 749 self.app.config_from_envvar(key, silent=False) 750 assert not self.app.config_from_envvar(key, force=True, silent=True) 751 752 os.environ[key] = __name__ + '.dict_config' 753 assert self.app.config_from_envvar(key, force=True) 754 assert self.app.conf['FOO'] == 10 755 assert self.app.conf['BAR'] == 20 756 757 @patch('celery.bin.celery.CeleryCommand.execute_from_commandline') 758 def test_start(self, execute): 759 self.app.start() 760 execute.assert_called() 761 762 @pytest.mark.parametrize('url,expected_fields', [ 763 ('pyamqp://', { 764 'hostname': 'localhost', 765 'userid': 'guest', 766 'password': 'guest', 767 'virtual_host': '/', 768 }), 769 ('pyamqp://:1978/foo', { 770 'port': 1978, 771 'virtual_host': 'foo', 772 }), 773 ('pyamqp:////value', { 774 'virtual_host': '/value', 775 }) 776 ]) 777 def test_amqp_get_broker_info(self, url, expected_fields): 778 info = self.app.connection(url).info() 779 for key, expected_value in items(expected_fields): 780 assert info[key] == expected_value 781 782 def test_amqp_failover_strategy_selection(self): 783 # Test passing in a string and make sure the string 784 # gets there untouched 785 self.app.conf.broker_failover_strategy = 'foo-bar' 786 assert self.app.connection('amqp:////value') \ 787 .failover_strategy == 'foo-bar' 788 789 # Try passing in None 790 self.app.conf.broker_failover_strategy = None 791 assert self.app.connection('amqp:////value') \ 792 .failover_strategy == itertools.cycle 793 794 # Test passing in a method 795 def my_failover_strategy(it): 796 yield True 797 798 self.app.conf.broker_failover_strategy = my_failover_strategy 799 assert self.app.connection('amqp:////value') \ 800 .failover_strategy == my_failover_strategy 801 802 def test_after_fork(self): 803 self.app._pool = Mock() 804 self.app.on_after_fork = Mock(name='on_after_fork') 805 self.app._after_fork() 806 assert self.app._pool is None 807 self.app.on_after_fork.send.assert_called_with(sender=self.app) 808 self.app._after_fork() 809 810 def test_global_after_fork(self): 811 self.app._after_fork = Mock(name='_after_fork') 812 _appbase._after_fork_cleanup_app(self.app) 813 self.app._after_fork.assert_called_with() 814 815 @patch('celery.app.base.logger') 816 def test_after_fork_cleanup_app__raises(self, logger): 817 self.app._after_fork = Mock(name='_after_fork') 818 exc = self.app._after_fork.side_effect = KeyError() 819 _appbase._after_fork_cleanup_app(self.app) 820 logger.info.assert_called_with( 821 'after forker raised exception: %r', exc, exc_info=1) 822 823 def test_ensure_after_fork__no_multiprocessing(self): 824 prev, _appbase.register_after_fork = ( 825 _appbase.register_after_fork, None) 826 try: 827 self.app._after_fork_registered = False 828 self.app._ensure_after_fork() 829 assert self.app._after_fork_registered 830 finally: 831 _appbase.register_after_fork = prev 832 833 def test_canvas(self): 834 assert self.app._canvas.Signature 835 836 def test_signature(self): 837 sig = self.app.signature('foo', (1, 2)) 838 assert sig.app is self.app 839 840 def test_timezone__none_set(self): 841 self.app.conf.timezone = None 842 self.app.conf.enable_utc = True 843 assert self.app.timezone == timezone.utc 844 del self.app.timezone 845 self.app.conf.enable_utc = False 846 assert self.app.timezone == timezone.local 847 848 def test_uses_utc_timezone(self): 849 self.app.conf.timezone = None 850 self.app.conf.enable_utc = True 851 assert self.app.uses_utc_timezone() is True 852 853 self.app.conf.enable_utc = False 854 del self.app.timezone 855 assert self.app.uses_utc_timezone() is False 856 857 self.app.conf.timezone = 'US/Eastern' 858 del self.app.timezone 859 assert self.app.uses_utc_timezone() is False 860 861 self.app.conf.timezone = 'UTC' 862 del self.app.timezone 863 assert self.app.uses_utc_timezone() is True 864 865 def test_compat_on_configure(self): 866 _on_configure = Mock(name='on_configure') 867 868 class CompatApp(Celery): 869 870 def on_configure(self, *args, **kwargs): 871 # on pypy3 if named on_configure the class function 872 # will be called, instead of the mock defined above, 873 # so we add the underscore. 874 _on_configure(*args, **kwargs) 875 876 with CompatApp(set_as_current=False) as app: 877 app.loader = Mock() 878 app.loader.conf = {} 879 app._load_config() 880 _on_configure.assert_called_with() 881 882 def test_add_periodic_task(self): 883 884 @self.app.task 885 def add(x, y): 886 pass 887 assert not self.app.configured 888 self.app.add_periodic_task( 889 10, self.app.signature('add', (2, 2)), 890 name='add1', expires=3, 891 ) 892 assert self.app._pending_periodic_tasks 893 assert not self.app.configured 894 895 sig2 = add.s(4, 4) 896 assert self.app.configured 897 self.app.add_periodic_task(20, sig2, name='add2', expires=4) 898 assert 'add1' in self.app.conf.beat_schedule 899 assert 'add2' in self.app.conf.beat_schedule 900 901 def test_pool_no_multiprocessing(self): 902 with mock.mask_modules('multiprocessing.util'): 903 pool = self.app.pool 904 assert pool is self.app._pool 905 906 def test_bugreport(self): 907 assert self.app.bugreport() 908 909 def test_send_task__connection_provided(self): 910 connection = Mock(name='connection') 911 router = Mock(name='router') 912 router.route.return_value = {} 913 self.app.amqp = Mock(name='amqp') 914 self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value') 915 self.app.send_task('foo', (1, 2), connection=connection, router=router) 916 self.app.amqp.Producer.assert_called_with( 917 connection, auto_declare=False) 918 self.app.amqp.send_task_message.assert_called_with( 919 self.app.amqp.Producer(), 'foo', 920 self.app.amqp.create_task_message()) 921 922 def test_send_task_sent_event(self): 923 924 class Dispatcher(object): 925 sent = [] 926 927 def publish(self, type, fields, *args, **kwargs): 928 self.sent.append((type, fields)) 929 930 conn = self.app.connection() 931 chan = conn.channel() 932 try: 933 for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'): 934 chan.exchange_declare(e, 'direct', durable=True) 935 chan.queue_declare(e, durable=True) 936 chan.queue_bind(e, e, e) 937 finally: 938 chan.close() 939 assert conn.transport_cls == 'memory' 940 941 message = self.app.amqp.create_task_message( 942 'id', 'footask', (), {}, create_sent_event=True, 943 ) 944 945 prod = self.app.amqp.Producer(conn) 946 dispatcher = Dispatcher() 947 self.app.amqp.send_task_message( 948 prod, 'footask', message, 949 exchange='moo_exchange', routing_key='moo_exchange', 950 event_dispatcher=dispatcher, 951 ) 952 assert dispatcher.sent 953 assert dispatcher.sent[0][0] == 'task-sent' 954 self.app.amqp.send_task_message( 955 prod, 'footask', message, event_dispatcher=dispatcher, 956 exchange='bar_exchange', routing_key='bar_exchange', 957 ) 958 959 def test_select_queues(self): 960 self.app.amqp = Mock(name='amqp') 961 self.app.select_queues({'foo', 'bar'}) 962 self.app.amqp.queues.select.assert_called_with({'foo', 'bar'}) 963 964 def test_Beat(self): 965 from celery.apps.beat import Beat 966 beat = self.app.Beat() 967 assert isinstance(beat, Beat) 968 969 def test_registry_cls(self): 970 971 class TaskRegistry(self.app.registry_cls): 972 pass 973 974 class CustomCelery(type(self.app)): 975 registry_cls = TaskRegistry 976 977 app = CustomCelery(set_as_current=False) 978 assert isinstance(app.tasks, TaskRegistry) 979 980 981class test_defaults: 982 983 def test_strtobool(self): 984 for s in ('false', 'no', '0'): 985 assert not defaults.strtobool(s) 986 for s in ('true', 'yes', '1'): 987 assert defaults.strtobool(s) 988 with pytest.raises(TypeError): 989 defaults.strtobool('unsure') 990 991 992class test_debugging_utils: 993 994 def test_enable_disable_trace(self): 995 try: 996 _app.enable_trace() 997 assert _state.app_or_default == _state._app_or_default_trace 998 _app.disable_trace() 999 assert _state.app_or_default == _state._app_or_default 1000 finally: 1001 _app.disable_trace() 1002 1003 1004class test_pyimplementation: 1005 1006 def test_platform_python_implementation(self): 1007 with mock.platform_pyimp(lambda: 'Xython'): 1008 assert pyimplementation() == 'Xython' 1009 1010 def test_platform_jython(self): 1011 with mock.platform_pyimp(): 1012 with mock.sys_platform('java 1.6.51'): 1013 assert 'Jython' in pyimplementation() 1014 1015 def test_platform_pypy(self): 1016 with mock.platform_pyimp(): 1017 with mock.sys_platform('darwin'): 1018 with mock.pypy_version((1, 4, 3)): 1019 assert 'PyPy' in pyimplementation() 1020 with mock.pypy_version((1, 4, 3, 'a4')): 1021 assert 'PyPy' in pyimplementation() 1022 1023 def test_platform_fallback(self): 1024 with mock.platform_pyimp(): 1025 with mock.sys_platform('darwin'): 1026 with mock.pypy_version(): 1027 assert 'CPython' == pyimplementation() 1028 1029 1030class test_shared_task: 1031 1032 def test_registers_to_all_apps(self): 1033 with self.Celery('xproj', set_as_current=True) as xproj: 1034 xproj.finalize() 1035 1036 @shared_task 1037 def foo(): 1038 return 42 1039 1040 @shared_task() 1041 def bar(): 1042 return 84 1043 1044 assert foo.app is xproj 1045 assert bar.app is xproj 1046 assert foo._get_current_object() 1047 1048 with self.Celery('yproj', set_as_current=True) as yproj: 1049 assert foo.app is yproj 1050 assert bar.app is yproj 1051 1052 @shared_task() 1053 def baz(): 1054 return 168 1055 1056 assert baz.app is yproj 1057