1from __future__ import absolute_import, unicode_literals 2 3import logging 4import os 5import signal 6import sys 7 8import pytest 9from billiard.process import current_process 10from case import Mock, mock, patch, skip 11from kombu import Exchange, Queue 12 13from celery import platforms, signals 14from celery.app import trace 15from celery.apps import worker as cd 16from celery.bin.worker import main as worker_main 17from celery.bin.worker import worker 18from celery.exceptions import (ImproperlyConfigured, WorkerShutdown, 19 WorkerTerminate) 20from celery.five import reload as reload_module 21from celery.platforms import EX_FAILURE, EX_OK 22from celery.worker import state 23 24 25@pytest.fixture(autouse=True) 26def reset_worker_optimizations(): 27 yield 28 trace.reset_worker_optimizations() 29 30 31class Worker(cd.Worker): 32 redirect_stdouts = False 33 34 def start(self, *args, **kwargs): 35 self.on_start() 36 37 38class test_Worker: 39 Worker = Worker 40 41 def test_queues_string(self): 42 with mock.stdouts(): 43 w = self.app.Worker() 44 w.setup_queues('foo,bar,baz') 45 assert 'foo' in self.app.amqp.queues 46 47 def test_cpu_count(self): 48 with mock.stdouts(): 49 with patch('celery.worker.worker.cpu_count') as cpu_count: 50 cpu_count.side_effect = NotImplementedError() 51 w = self.app.Worker(concurrency=None) 52 assert w.concurrency == 2 53 w = self.app.Worker(concurrency=5) 54 assert w.concurrency == 5 55 56 def test_windows_B_option(self): 57 with mock.stdouts(): 58 self.app.IS_WINDOWS = True 59 with pytest.raises(SystemExit): 60 worker(app=self.app).run(beat=True) 61 62 def test_setup_concurrency_very_early(self): 63 x = worker() 64 x.run = Mock() 65 with pytest.raises(ImportError): 66 x.execute_from_commandline(['worker', '-P', 'xyzybox']) 67 68 def test_run_from_argv_basic(self): 69 x = worker(app=self.app) 70 x.run = Mock() 71 x.maybe_detach = Mock() 72 73 def run(*args, **kwargs): 74 pass 75 76 x.run = run 77 x.run_from_argv('celery', []) 78 x.maybe_detach.assert_called() 79 80 def test_maybe_detach(self): 81 x = worker(app=self.app) 82 with patch('celery.bin.worker.detached_celeryd') as detached: 83 x.maybe_detach([]) 84 detached.assert_not_called() 85 with pytest.raises(SystemExit): 86 x.maybe_detach(['--detach']) 87 detached.assert_called() 88 89 def test_invalid_loglevel_gives_error(self): 90 with mock.stdouts(): 91 x = worker(app=self.app) 92 with pytest.raises(SystemExit): 93 x.run(loglevel='GRIM_REAPER') 94 95 def test_no_loglevel(self): 96 self.app.Worker = Mock() 97 worker(app=self.app).run(loglevel=None) 98 99 def test_tasklist(self): 100 worker = self.app.Worker() 101 assert worker.app.tasks 102 assert worker.app.finalized 103 assert worker.tasklist(include_builtins=True) 104 worker.tasklist(include_builtins=False) 105 106 def test_extra_info(self): 107 worker = self.app.Worker() 108 worker.loglevel = logging.WARNING 109 assert not worker.extra_info() 110 worker.loglevel = logging.INFO 111 assert worker.extra_info() 112 113 def test_loglevel_string(self): 114 with mock.stdouts(): 115 worker = self.Worker(app=self.app, loglevel='INFO') 116 assert worker.loglevel == logging.INFO 117 118 def test_run_worker(self, patching): 119 handlers = {} 120 121 class Signals(platforms.Signals): 122 123 def __setitem__(self, sig, handler): 124 handlers[sig] = handler 125 126 patching.setattr('celery.platforms.signals', Signals()) 127 with mock.stdouts(): 128 w = self.Worker(app=self.app) 129 w._isatty = False 130 w.on_start() 131 for sig in 'SIGINT', 'SIGHUP', 'SIGTERM': 132 assert sig in handlers 133 134 handlers.clear() 135 w = self.Worker(app=self.app) 136 w._isatty = True 137 w.on_start() 138 for sig in 'SIGINT', 'SIGTERM': 139 assert sig in handlers 140 assert 'SIGHUP' not in handlers 141 142 def test_startup_info(self): 143 with mock.stdouts(): 144 worker = self.Worker(app=self.app) 145 worker.on_start() 146 assert worker.startup_info() 147 worker.loglevel = logging.DEBUG 148 assert worker.startup_info() 149 worker.loglevel = logging.INFO 150 assert worker.startup_info() 151 worker.autoscale = 13, 10 152 assert worker.startup_info() 153 154 prev_loader = self.app.loader 155 worker = self.Worker( 156 app=self.app, 157 queues='foo,bar,baz,xuzzy,do,re,mi', 158 ) 159 with patch('celery.apps.worker.qualname') as qualname: 160 qualname.return_value = 'acme.backed_beans.Loader' 161 assert worker.startup_info() 162 163 with patch('celery.apps.worker.qualname') as qualname: 164 qualname.return_value = 'celery.loaders.Loader' 165 assert worker.startup_info() 166 167 from celery.loaders.app import AppLoader 168 self.app.loader = AppLoader(app=self.app) 169 assert worker.startup_info() 170 171 self.app.loader = prev_loader 172 worker.task_events = True 173 assert worker.startup_info() 174 175 # test when there are too few output lines 176 # to draft the ascii art onto 177 prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox'] 178 try: 179 assert worker.startup_info() 180 finally: 181 cd.ARTLINES = prev 182 183 def test_run(self): 184 with mock.stdouts(): 185 self.Worker(app=self.app).on_start() 186 self.Worker(app=self.app, purge=True).on_start() 187 worker = self.Worker(app=self.app) 188 worker.on_start() 189 190 def test_purge_messages(self): 191 with mock.stdouts(): 192 self.Worker(app=self.app).purge_messages() 193 194 def test_init_queues(self): 195 with mock.stdouts(): 196 app = self.app 197 c = app.conf 198 app.amqp.queues = app.amqp.Queues({ 199 'celery': { 200 'exchange': 'celery', 201 'routing_key': 'celery', 202 }, 203 'video': { 204 'exchange': 'video', 205 'routing_key': 'video', 206 }, 207 }) 208 worker = self.Worker(app=self.app) 209 worker.setup_queues(['video']) 210 assert 'video' in app.amqp.queues 211 assert 'video' in app.amqp.queues.consume_from 212 assert 'celery' in app.amqp.queues 213 assert 'celery' not in app.amqp.queues.consume_from 214 215 c.task_create_missing_queues = False 216 del (app.amqp.queues) 217 with pytest.raises(ImproperlyConfigured): 218 self.Worker(app=self.app).setup_queues(['image']) 219 del (app.amqp.queues) 220 c.task_create_missing_queues = True 221 worker = self.Worker(app=self.app) 222 worker.setup_queues(['image']) 223 assert 'image' in app.amqp.queues.consume_from 224 assert app.amqp.queues['image'] == Queue( 225 'image', Exchange('image'), 226 routing_key='image', 227 ) 228 229 def test_autoscale_argument(self): 230 with mock.stdouts(): 231 worker1 = self.Worker(app=self.app, autoscale='10,3') 232 assert worker1.autoscale == [10, 3] 233 worker2 = self.Worker(app=self.app, autoscale='10') 234 assert worker2.autoscale == [10, 0] 235 236 def test_include_argument(self): 237 worker1 = self.Worker(app=self.app, include='os') 238 assert worker1.include == ['os'] 239 worker2 = self.Worker(app=self.app, 240 include='os,sys') 241 assert worker2.include == ['os', 'sys'] 242 self.Worker(app=self.app, include=['os', 'sys']) 243 244 def test_unknown_loglevel(self): 245 with mock.stdouts(): 246 with pytest.raises(SystemExit): 247 worker(app=self.app).run(loglevel='ALIEN') 248 worker1 = self.Worker(app=self.app, loglevel=0xFFFF) 249 assert worker1.loglevel == 0xFFFF 250 251 @patch('os._exit') 252 @skip.if_win32() 253 def test_warns_if_running_as_privileged_user(self, _exit, patching): 254 getuid = patching('os.getuid') 255 256 with mock.stdouts() as (_, stderr): 257 getuid.return_value = 0 258 self.app.conf.accept_content = ['pickle'] 259 worker = self.Worker(app=self.app) 260 worker.on_start() 261 _exit.assert_called_with(1) 262 patching.setattr('celery.platforms.C_FORCE_ROOT', True) 263 worker = self.Worker(app=self.app) 264 worker.on_start() 265 assert 'a very bad idea' in stderr.getvalue() 266 patching.setattr('celery.platforms.C_FORCE_ROOT', False) 267 self.app.conf.accept_content = ['json'] 268 worker = self.Worker(app=self.app) 269 worker.on_start() 270 assert 'superuser' in stderr.getvalue() 271 272 def test_redirect_stdouts(self): 273 with mock.stdouts(): 274 self.Worker(app=self.app, redirect_stdouts=False) 275 with pytest.raises(AttributeError): 276 sys.stdout.logger 277 278 def test_on_start_custom_logging(self): 279 with mock.stdouts(): 280 self.app.log.redirect_stdouts = Mock() 281 worker = self.Worker(app=self.app, redirect_stoutds=True) 282 worker._custom_logging = True 283 worker.on_start() 284 self.app.log.redirect_stdouts.assert_not_called() 285 286 def test_setup_logging_no_color(self): 287 worker = self.Worker( 288 app=self.app, redirect_stdouts=False, no_color=True, 289 ) 290 prev, self.app.log.setup = self.app.log.setup, Mock() 291 try: 292 worker.setup_logging() 293 assert not self.app.log.setup.call_args[1]['colorize'] 294 finally: 295 self.app.log.setup = prev 296 297 def test_startup_info_pool_is_str(self): 298 with mock.stdouts(): 299 worker = self.Worker(app=self.app, redirect_stdouts=False) 300 worker.pool_cls = 'foo' 301 worker.startup_info() 302 303 def test_redirect_stdouts_already_handled(self): 304 logging_setup = [False] 305 306 @signals.setup_logging.connect 307 def on_logging_setup(**kwargs): 308 logging_setup[0] = True 309 310 try: 311 worker = self.Worker(app=self.app, redirect_stdouts=False) 312 worker.app.log.already_setup = False 313 worker.setup_logging() 314 assert logging_setup[0] 315 with pytest.raises(AttributeError): 316 sys.stdout.logger 317 finally: 318 signals.setup_logging.disconnect(on_logging_setup) 319 320 def test_platform_tweaks_macOS(self): 321 322 class macOSWorker(Worker): 323 proxy_workaround_installed = False 324 325 def macOS_proxy_detection_workaround(self): 326 self.proxy_workaround_installed = True 327 328 with mock.stdouts(): 329 worker = macOSWorker(app=self.app, redirect_stdouts=False) 330 331 def install_HUP_nosupport(controller): 332 controller.hup_not_supported_installed = True 333 334 class Controller(object): 335 pass 336 337 prev = cd.install_HUP_not_supported_handler 338 cd.install_HUP_not_supported_handler = install_HUP_nosupport 339 try: 340 worker.app.IS_macOS = True 341 controller = Controller() 342 worker.install_platform_tweaks(controller) 343 assert controller.hup_not_supported_installed 344 assert worker.proxy_workaround_installed 345 finally: 346 cd.install_HUP_not_supported_handler = prev 347 348 def test_general_platform_tweaks(self): 349 350 restart_worker_handler_installed = [False] 351 352 def install_worker_restart_handler(worker): 353 restart_worker_handler_installed[0] = True 354 355 class Controller(object): 356 pass 357 358 with mock.stdouts(): 359 prev = cd.install_worker_restart_handler 360 cd.install_worker_restart_handler = install_worker_restart_handler 361 try: 362 worker = self.Worker(app=self.app) 363 worker.app.IS_macOS = False 364 worker.install_platform_tweaks(Controller()) 365 assert restart_worker_handler_installed[0] 366 finally: 367 cd.install_worker_restart_handler = prev 368 369 def test_on_consumer_ready(self): 370 worker_ready_sent = [False] 371 372 @signals.worker_ready.connect 373 def on_worker_ready(**kwargs): 374 worker_ready_sent[0] = True 375 376 with mock.stdouts(): 377 self.Worker(app=self.app).on_consumer_ready(object()) 378 assert worker_ready_sent[0] 379 380 def test_disable_task_events(self): 381 worker = self.Worker(app=self.app, task_events=False, 382 without_gossip=True, 383 without_heartbeat=True) 384 consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps 385 assert not any(True for step in consumer_steps 386 if step.alias == 'Events') 387 388 def test_enable_task_events(self): 389 worker = self.Worker(app=self.app, task_events=True) 390 consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps 391 assert any(True for step in consumer_steps 392 if step.alias == 'Events') 393 394 395@mock.stdouts 396class test_funs: 397 398 def test_active_thread_count(self): 399 assert cd.active_thread_count() 400 401 @skip.unless_module('setproctitle') 402 def test_set_process_status(self): 403 worker = Worker(app=self.app, hostname='xyzza') 404 prev1, sys.argv = sys.argv, ['Arg0'] 405 try: 406 st = worker.set_process_status('Running') 407 assert 'celeryd' in st 408 assert 'xyzza' in st 409 assert 'Running' in st 410 prev2, sys.argv = sys.argv, ['Arg0', 'Arg1'] 411 try: 412 st = worker.set_process_status('Running') 413 assert 'celeryd' in st 414 assert 'xyzza' in st 415 assert 'Running' in st 416 assert 'Arg1' in st 417 finally: 418 sys.argv = prev2 419 finally: 420 sys.argv = prev1 421 422 def test_parse_options(self): 423 cmd = worker() 424 cmd.app = self.app 425 opts, args = cmd.parse_options('worker', ['--concurrency=512', 426 '--heartbeat-interval=10']) 427 assert opts['concurrency'] == 512 428 assert opts['heartbeat_interval'] == 10 429 430 def test_main(self): 431 p, cd.Worker = cd.Worker, Worker 432 s, sys.argv = sys.argv, ['worker', '--discard'] 433 try: 434 worker_main(app=self.app) 435 finally: 436 cd.Worker = p 437 sys.argv = s 438 439 440@mock.stdouts 441class test_signal_handlers: 442 class _Worker(object): 443 hostname = 'foo' 444 stopped = False 445 terminated = False 446 447 def stop(self, in_sighandler=False): 448 self.stopped = True 449 450 def terminate(self, in_sighandler=False): 451 self.terminated = True 452 453 def psig(self, fun, *args, **kwargs): 454 handlers = {} 455 456 class Signals(platforms.Signals): 457 def __setitem__(self, sig, handler): 458 handlers[sig] = handler 459 460 p, platforms.signals = platforms.signals, Signals() 461 try: 462 fun(*args, **kwargs) 463 return handlers 464 finally: 465 platforms.signals = p 466 467 def test_worker_int_handler(self): 468 worker = self._Worker() 469 handlers = self.psig(cd.install_worker_int_handler, worker) 470 next_handlers = {} 471 state.should_stop = None 472 state.should_terminate = None 473 474 class Signals(platforms.Signals): 475 476 def __setitem__(self, sig, handler): 477 next_handlers[sig] = handler 478 479 with patch('celery.apps.worker.active_thread_count') as c: 480 c.return_value = 3 481 p, platforms.signals = platforms.signals, Signals() 482 try: 483 handlers['SIGINT']('SIGINT', object()) 484 assert state.should_stop 485 assert state.should_stop == EX_FAILURE 486 finally: 487 platforms.signals = p 488 state.should_stop = None 489 490 try: 491 next_handlers['SIGINT']('SIGINT', object()) 492 assert state.should_terminate 493 assert state.should_terminate == EX_FAILURE 494 finally: 495 state.should_terminate = None 496 497 with patch('celery.apps.worker.active_thread_count') as c: 498 c.return_value = 1 499 p, platforms.signals = platforms.signals, Signals() 500 try: 501 with pytest.raises(WorkerShutdown): 502 handlers['SIGINT']('SIGINT', object()) 503 finally: 504 platforms.signals = p 505 506 with pytest.raises(WorkerTerminate): 507 next_handlers['SIGINT']('SIGINT', object()) 508 509 @skip.unless_module('multiprocessing') 510 def test_worker_int_handler_only_stop_MainProcess(self): 511 process = current_process() 512 name, process.name = process.name, 'OtherProcess' 513 with patch('celery.apps.worker.active_thread_count') as c: 514 c.return_value = 3 515 try: 516 worker = self._Worker() 517 handlers = self.psig(cd.install_worker_int_handler, worker) 518 handlers['SIGINT']('SIGINT', object()) 519 assert state.should_stop 520 finally: 521 process.name = name 522 state.should_stop = None 523 524 with patch('celery.apps.worker.active_thread_count') as c: 525 c.return_value = 1 526 try: 527 worker = self._Worker() 528 handlers = self.psig(cd.install_worker_int_handler, worker) 529 with pytest.raises(WorkerShutdown): 530 handlers['SIGINT']('SIGINT', object()) 531 finally: 532 process.name = name 533 state.should_stop = None 534 535 def test_install_HUP_not_supported_handler(self): 536 worker = self._Worker() 537 handlers = self.psig(cd.install_HUP_not_supported_handler, worker) 538 handlers['SIGHUP']('SIGHUP', object()) 539 540 @skip.unless_module('multiprocessing') 541 def test_worker_term_hard_handler_only_stop_MainProcess(self): 542 process = current_process() 543 name, process.name = process.name, 'OtherProcess' 544 try: 545 with patch('celery.apps.worker.active_thread_count') as c: 546 c.return_value = 3 547 worker = self._Worker() 548 handlers = self.psig( 549 cd.install_worker_term_hard_handler, worker) 550 try: 551 handlers['SIGQUIT']('SIGQUIT', object()) 552 assert state.should_terminate 553 finally: 554 state.should_terminate = None 555 with patch('celery.apps.worker.active_thread_count') as c: 556 c.return_value = 1 557 worker = self._Worker() 558 handlers = self.psig( 559 cd.install_worker_term_hard_handler, worker) 560 try: 561 with pytest.raises(WorkerTerminate): 562 handlers['SIGQUIT']('SIGQUIT', object()) 563 finally: 564 state.should_terminate = None 565 finally: 566 process.name = name 567 568 def test_worker_term_handler_when_threads(self): 569 with patch('celery.apps.worker.active_thread_count') as c: 570 c.return_value = 3 571 worker = self._Worker() 572 handlers = self.psig(cd.install_worker_term_handler, worker) 573 try: 574 handlers['SIGTERM']('SIGTERM', object()) 575 assert state.should_stop == EX_OK 576 finally: 577 state.should_stop = None 578 579 def test_worker_term_handler_when_single_thread(self): 580 with patch('celery.apps.worker.active_thread_count') as c: 581 c.return_value = 1 582 worker = self._Worker() 583 handlers = self.psig(cd.install_worker_term_handler, worker) 584 try: 585 with pytest.raises(WorkerShutdown): 586 handlers['SIGTERM']('SIGTERM', object()) 587 finally: 588 state.should_stop = None 589 590 @patch('sys.__stderr__') 591 @skip.if_pypy() 592 @skip.if_jython() 593 def test_worker_cry_handler(self, stderr): 594 handlers = self.psig(cd.install_cry_handler) 595 assert handlers['SIGUSR1']('SIGUSR1', object()) is None 596 stderr.write.assert_called() 597 598 @skip.unless_module('multiprocessing') 599 def test_worker_term_handler_only_stop_MainProcess(self): 600 process = current_process() 601 name, process.name = process.name, 'OtherProcess' 602 try: 603 with patch('celery.apps.worker.active_thread_count') as c: 604 c.return_value = 3 605 worker = self._Worker() 606 handlers = self.psig(cd.install_worker_term_handler, worker) 607 handlers['SIGTERM']('SIGTERM', object()) 608 assert state.should_stop == EX_OK 609 with patch('celery.apps.worker.active_thread_count') as c: 610 c.return_value = 1 611 worker = self._Worker() 612 handlers = self.psig(cd.install_worker_term_handler, worker) 613 with pytest.raises(WorkerShutdown): 614 handlers['SIGTERM']('SIGTERM', object()) 615 finally: 616 process.name = name 617 state.should_stop = None 618 619 @skip.unless_symbol('os.execv') 620 @patch('celery.platforms.close_open_fds') 621 @patch('atexit.register') 622 @patch('os.close') 623 def test_worker_restart_handler(self, _close, register, close_open): 624 argv = [] 625 626 def _execv(*args): 627 argv.extend(args) 628 629 execv, os.execv = os.execv, _execv 630 try: 631 worker = self._Worker() 632 handlers = self.psig(cd.install_worker_restart_handler, worker) 633 handlers['SIGHUP']('SIGHUP', object()) 634 assert state.should_stop == EX_OK 635 register.assert_called() 636 callback = register.call_args[0][0] 637 callback() 638 assert argv 639 finally: 640 os.execv = execv 641 state.should_stop = None 642 643 def test_worker_term_hard_handler_when_threaded(self): 644 with patch('celery.apps.worker.active_thread_count') as c: 645 c.return_value = 3 646 worker = self._Worker() 647 handlers = self.psig(cd.install_worker_term_hard_handler, worker) 648 try: 649 handlers['SIGQUIT']('SIGQUIT', object()) 650 assert state.should_terminate 651 finally: 652 state.should_terminate = None 653 654 def test_worker_term_hard_handler_when_single_threaded(self): 655 with patch('celery.apps.worker.active_thread_count') as c: 656 c.return_value = 1 657 worker = self._Worker() 658 handlers = self.psig(cd.install_worker_term_hard_handler, worker) 659 with pytest.raises(WorkerTerminate): 660 handlers['SIGQUIT']('SIGQUIT', object()) 661 662 def test_send_worker_shutting_down_signal(self): 663 with patch('celery.apps.worker.signals.worker_shutting_down') as wsd: 664 worker = self._Worker() 665 handlers = self.psig(cd.install_worker_term_handler, worker) 666 try: 667 with pytest.raises(WorkerShutdown): 668 handlers['SIGTERM']('SIGTERM', object()) 669 finally: 670 state.should_stop = None 671 wsd.send.assert_called_with( 672 sender='foo', sig='SIGTERM', how='Warm', exitcode=0, 673 ) 674 675 @pytest.mark.xfail( 676 not hasattr(signal, "SIGQUIT"), 677 reason="Windows does not support SIGQUIT", 678 raises=AttributeError, 679 ) 680 @patch.dict(os.environ, {"REMAP_SIGTERM": "SIGQUIT"}) 681 def test_send_worker_shutting_down_signal_with_remap_sigquit(self): 682 with patch('celery.apps.worker.signals.worker_shutting_down') as wsd: 683 from billiard import common 684 685 reload_module(common) 686 reload_module(cd) 687 688 worker = self._Worker() 689 handlers = self.psig(cd.install_worker_term_handler, worker) 690 try: 691 with pytest.raises(WorkerTerminate): 692 handlers['SIGTERM']('SIGTERM', object()) 693 finally: 694 state.should_stop = None 695 wsd.send.assert_called_with( 696 sender='foo', sig='SIGTERM', how='Cold', exitcode=1, 697 ) 698