1from __future__ import absolute_import, unicode_literals
2
3import logging
4import os
5import signal
6import sys
7
8import pytest
9from billiard.process import current_process
10from case import Mock, mock, patch, skip
11from kombu import Exchange, Queue
12
13from celery import platforms, signals
14from celery.app import trace
15from celery.apps import worker as cd
16from celery.bin.worker import main as worker_main
17from celery.bin.worker import worker
18from celery.exceptions import (ImproperlyConfigured, WorkerShutdown,
19                               WorkerTerminate)
20from celery.five import reload as reload_module
21from celery.platforms import EX_FAILURE, EX_OK
22from celery.worker import state
23
24
25@pytest.fixture(autouse=True)
26def reset_worker_optimizations():
27    yield
28    trace.reset_worker_optimizations()
29
30
31class Worker(cd.Worker):
32    redirect_stdouts = False
33
34    def start(self, *args, **kwargs):
35        self.on_start()
36
37
38class test_Worker:
39    Worker = Worker
40
41    def test_queues_string(self):
42        with mock.stdouts():
43            w = self.app.Worker()
44            w.setup_queues('foo,bar,baz')
45            assert 'foo' in self.app.amqp.queues
46
47    def test_cpu_count(self):
48        with mock.stdouts():
49            with patch('celery.worker.worker.cpu_count') as cpu_count:
50                cpu_count.side_effect = NotImplementedError()
51                w = self.app.Worker(concurrency=None)
52                assert w.concurrency == 2
53            w = self.app.Worker(concurrency=5)
54            assert w.concurrency == 5
55
56    def test_windows_B_option(self):
57        with mock.stdouts():
58            self.app.IS_WINDOWS = True
59            with pytest.raises(SystemExit):
60                worker(app=self.app).run(beat=True)
61
62    def test_setup_concurrency_very_early(self):
63        x = worker()
64        x.run = Mock()
65        with pytest.raises(ImportError):
66            x.execute_from_commandline(['worker', '-P', 'xyzybox'])
67
68    def test_run_from_argv_basic(self):
69        x = worker(app=self.app)
70        x.run = Mock()
71        x.maybe_detach = Mock()
72
73        def run(*args, **kwargs):
74            pass
75
76        x.run = run
77        x.run_from_argv('celery', [])
78        x.maybe_detach.assert_called()
79
80    def test_maybe_detach(self):
81        x = worker(app=self.app)
82        with patch('celery.bin.worker.detached_celeryd') as detached:
83            x.maybe_detach([])
84            detached.assert_not_called()
85            with pytest.raises(SystemExit):
86                x.maybe_detach(['--detach'])
87            detached.assert_called()
88
89    def test_invalid_loglevel_gives_error(self):
90        with mock.stdouts():
91            x = worker(app=self.app)
92            with pytest.raises(SystemExit):
93                x.run(loglevel='GRIM_REAPER')
94
95    def test_no_loglevel(self):
96        self.app.Worker = Mock()
97        worker(app=self.app).run(loglevel=None)
98
99    def test_tasklist(self):
100        worker = self.app.Worker()
101        assert worker.app.tasks
102        assert worker.app.finalized
103        assert worker.tasklist(include_builtins=True)
104        worker.tasklist(include_builtins=False)
105
106    def test_extra_info(self):
107        worker = self.app.Worker()
108        worker.loglevel = logging.WARNING
109        assert not worker.extra_info()
110        worker.loglevel = logging.INFO
111        assert worker.extra_info()
112
113    def test_loglevel_string(self):
114        with mock.stdouts():
115            worker = self.Worker(app=self.app, loglevel='INFO')
116            assert worker.loglevel == logging.INFO
117
118    def test_run_worker(self, patching):
119        handlers = {}
120
121        class Signals(platforms.Signals):
122
123            def __setitem__(self, sig, handler):
124                handlers[sig] = handler
125
126        patching.setattr('celery.platforms.signals', Signals())
127        with mock.stdouts():
128            w = self.Worker(app=self.app)
129            w._isatty = False
130            w.on_start()
131            for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
132                assert sig in handlers
133
134            handlers.clear()
135            w = self.Worker(app=self.app)
136            w._isatty = True
137            w.on_start()
138            for sig in 'SIGINT', 'SIGTERM':
139                assert sig in handlers
140            assert 'SIGHUP' not in handlers
141
142    def test_startup_info(self):
143        with mock.stdouts():
144            worker = self.Worker(app=self.app)
145            worker.on_start()
146            assert worker.startup_info()
147            worker.loglevel = logging.DEBUG
148            assert worker.startup_info()
149            worker.loglevel = logging.INFO
150            assert worker.startup_info()
151            worker.autoscale = 13, 10
152            assert worker.startup_info()
153
154            prev_loader = self.app.loader
155            worker = self.Worker(
156                app=self.app,
157                queues='foo,bar,baz,xuzzy,do,re,mi',
158            )
159            with patch('celery.apps.worker.qualname') as qualname:
160                qualname.return_value = 'acme.backed_beans.Loader'
161                assert worker.startup_info()
162
163            with patch('celery.apps.worker.qualname') as qualname:
164                qualname.return_value = 'celery.loaders.Loader'
165                assert worker.startup_info()
166
167            from celery.loaders.app import AppLoader
168            self.app.loader = AppLoader(app=self.app)
169            assert worker.startup_info()
170
171            self.app.loader = prev_loader
172            worker.task_events = True
173            assert worker.startup_info()
174
175            # test when there are too few output lines
176            # to draft the ascii art onto
177            prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
178            try:
179                assert worker.startup_info()
180            finally:
181                cd.ARTLINES = prev
182
183    def test_run(self):
184        with mock.stdouts():
185            self.Worker(app=self.app).on_start()
186            self.Worker(app=self.app, purge=True).on_start()
187            worker = self.Worker(app=self.app)
188            worker.on_start()
189
190    def test_purge_messages(self):
191        with mock.stdouts():
192            self.Worker(app=self.app).purge_messages()
193
194    def test_init_queues(self):
195        with mock.stdouts():
196            app = self.app
197            c = app.conf
198            app.amqp.queues = app.amqp.Queues({
199                'celery': {
200                    'exchange': 'celery',
201                    'routing_key': 'celery',
202                },
203                'video': {
204                    'exchange': 'video',
205                    'routing_key': 'video',
206                },
207            })
208            worker = self.Worker(app=self.app)
209            worker.setup_queues(['video'])
210            assert 'video' in app.amqp.queues
211            assert 'video' in app.amqp.queues.consume_from
212            assert 'celery' in app.amqp.queues
213            assert 'celery' not in app.amqp.queues.consume_from
214
215            c.task_create_missing_queues = False
216            del (app.amqp.queues)
217            with pytest.raises(ImproperlyConfigured):
218                self.Worker(app=self.app).setup_queues(['image'])
219            del (app.amqp.queues)
220            c.task_create_missing_queues = True
221            worker = self.Worker(app=self.app)
222            worker.setup_queues(['image'])
223            assert 'image' in app.amqp.queues.consume_from
224            assert app.amqp.queues['image'] == Queue(
225                'image', Exchange('image'),
226                routing_key='image',
227            )
228
229    def test_autoscale_argument(self):
230        with mock.stdouts():
231            worker1 = self.Worker(app=self.app, autoscale='10,3')
232            assert worker1.autoscale == [10, 3]
233            worker2 = self.Worker(app=self.app, autoscale='10')
234            assert worker2.autoscale == [10, 0]
235
236    def test_include_argument(self):
237        worker1 = self.Worker(app=self.app, include='os')
238        assert worker1.include == ['os']
239        worker2 = self.Worker(app=self.app,
240                              include='os,sys')
241        assert worker2.include == ['os', 'sys']
242        self.Worker(app=self.app, include=['os', 'sys'])
243
244    def test_unknown_loglevel(self):
245        with mock.stdouts():
246            with pytest.raises(SystemExit):
247                worker(app=self.app).run(loglevel='ALIEN')
248            worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
249            assert worker1.loglevel == 0xFFFF
250
251    @patch('os._exit')
252    @skip.if_win32()
253    def test_warns_if_running_as_privileged_user(self, _exit, patching):
254        getuid = patching('os.getuid')
255
256        with mock.stdouts() as (_, stderr):
257            getuid.return_value = 0
258            self.app.conf.accept_content = ['pickle']
259            worker = self.Worker(app=self.app)
260            worker.on_start()
261            _exit.assert_called_with(1)
262            patching.setattr('celery.platforms.C_FORCE_ROOT', True)
263            worker = self.Worker(app=self.app)
264            worker.on_start()
265            assert 'a very bad idea' in stderr.getvalue()
266            patching.setattr('celery.platforms.C_FORCE_ROOT', False)
267            self.app.conf.accept_content = ['json']
268            worker = self.Worker(app=self.app)
269            worker.on_start()
270            assert 'superuser' in stderr.getvalue()
271
272    def test_redirect_stdouts(self):
273        with mock.stdouts():
274            self.Worker(app=self.app, redirect_stdouts=False)
275            with pytest.raises(AttributeError):
276                sys.stdout.logger
277
278    def test_on_start_custom_logging(self):
279        with mock.stdouts():
280            self.app.log.redirect_stdouts = Mock()
281            worker = self.Worker(app=self.app, redirect_stoutds=True)
282            worker._custom_logging = True
283            worker.on_start()
284            self.app.log.redirect_stdouts.assert_not_called()
285
286    def test_setup_logging_no_color(self):
287        worker = self.Worker(
288            app=self.app, redirect_stdouts=False, no_color=True,
289        )
290        prev, self.app.log.setup = self.app.log.setup, Mock()
291        try:
292            worker.setup_logging()
293            assert not self.app.log.setup.call_args[1]['colorize']
294        finally:
295            self.app.log.setup = prev
296
297    def test_startup_info_pool_is_str(self):
298        with mock.stdouts():
299            worker = self.Worker(app=self.app, redirect_stdouts=False)
300            worker.pool_cls = 'foo'
301            worker.startup_info()
302
303    def test_redirect_stdouts_already_handled(self):
304        logging_setup = [False]
305
306        @signals.setup_logging.connect
307        def on_logging_setup(**kwargs):
308            logging_setup[0] = True
309
310        try:
311            worker = self.Worker(app=self.app, redirect_stdouts=False)
312            worker.app.log.already_setup = False
313            worker.setup_logging()
314            assert logging_setup[0]
315            with pytest.raises(AttributeError):
316                sys.stdout.logger
317        finally:
318            signals.setup_logging.disconnect(on_logging_setup)
319
320    def test_platform_tweaks_macOS(self):
321
322        class macOSWorker(Worker):
323            proxy_workaround_installed = False
324
325            def macOS_proxy_detection_workaround(self):
326                self.proxy_workaround_installed = True
327
328        with mock.stdouts():
329            worker = macOSWorker(app=self.app, redirect_stdouts=False)
330
331            def install_HUP_nosupport(controller):
332                controller.hup_not_supported_installed = True
333
334            class Controller(object):
335                pass
336
337            prev = cd.install_HUP_not_supported_handler
338            cd.install_HUP_not_supported_handler = install_HUP_nosupport
339            try:
340                worker.app.IS_macOS = True
341                controller = Controller()
342                worker.install_platform_tweaks(controller)
343                assert controller.hup_not_supported_installed
344                assert worker.proxy_workaround_installed
345            finally:
346                cd.install_HUP_not_supported_handler = prev
347
348    def test_general_platform_tweaks(self):
349
350        restart_worker_handler_installed = [False]
351
352        def install_worker_restart_handler(worker):
353            restart_worker_handler_installed[0] = True
354
355        class Controller(object):
356            pass
357
358        with mock.stdouts():
359            prev = cd.install_worker_restart_handler
360            cd.install_worker_restart_handler = install_worker_restart_handler
361            try:
362                worker = self.Worker(app=self.app)
363                worker.app.IS_macOS = False
364                worker.install_platform_tweaks(Controller())
365                assert restart_worker_handler_installed[0]
366            finally:
367                cd.install_worker_restart_handler = prev
368
369    def test_on_consumer_ready(self):
370        worker_ready_sent = [False]
371
372        @signals.worker_ready.connect
373        def on_worker_ready(**kwargs):
374            worker_ready_sent[0] = True
375
376        with mock.stdouts():
377            self.Worker(app=self.app).on_consumer_ready(object())
378            assert worker_ready_sent[0]
379
380    def test_disable_task_events(self):
381        worker = self.Worker(app=self.app, task_events=False,
382                             without_gossip=True,
383                             without_heartbeat=True)
384        consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps
385        assert not any(True for step in consumer_steps
386                       if step.alias == 'Events')
387
388    def test_enable_task_events(self):
389        worker = self.Worker(app=self.app, task_events=True)
390        consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps
391        assert any(True for step in consumer_steps
392                   if step.alias == 'Events')
393
394
395@mock.stdouts
396class test_funs:
397
398    def test_active_thread_count(self):
399        assert cd.active_thread_count()
400
401    @skip.unless_module('setproctitle')
402    def test_set_process_status(self):
403        worker = Worker(app=self.app, hostname='xyzza')
404        prev1, sys.argv = sys.argv, ['Arg0']
405        try:
406            st = worker.set_process_status('Running')
407            assert 'celeryd' in st
408            assert 'xyzza' in st
409            assert 'Running' in st
410            prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
411            try:
412                st = worker.set_process_status('Running')
413                assert 'celeryd' in st
414                assert 'xyzza' in st
415                assert 'Running' in st
416                assert 'Arg1' in st
417            finally:
418                sys.argv = prev2
419        finally:
420            sys.argv = prev1
421
422    def test_parse_options(self):
423        cmd = worker()
424        cmd.app = self.app
425        opts, args = cmd.parse_options('worker', ['--concurrency=512',
426                                                  '--heartbeat-interval=10'])
427        assert opts['concurrency'] == 512
428        assert opts['heartbeat_interval'] == 10
429
430    def test_main(self):
431        p, cd.Worker = cd.Worker, Worker
432        s, sys.argv = sys.argv, ['worker', '--discard']
433        try:
434            worker_main(app=self.app)
435        finally:
436            cd.Worker = p
437            sys.argv = s
438
439
440@mock.stdouts
441class test_signal_handlers:
442    class _Worker(object):
443        hostname = 'foo'
444        stopped = False
445        terminated = False
446
447        def stop(self, in_sighandler=False):
448            self.stopped = True
449
450        def terminate(self, in_sighandler=False):
451            self.terminated = True
452
453    def psig(self, fun, *args, **kwargs):
454        handlers = {}
455
456        class Signals(platforms.Signals):
457            def __setitem__(self, sig, handler):
458                handlers[sig] = handler
459
460        p, platforms.signals = platforms.signals, Signals()
461        try:
462            fun(*args, **kwargs)
463            return handlers
464        finally:
465            platforms.signals = p
466
467    def test_worker_int_handler(self):
468        worker = self._Worker()
469        handlers = self.psig(cd.install_worker_int_handler, worker)
470        next_handlers = {}
471        state.should_stop = None
472        state.should_terminate = None
473
474        class Signals(platforms.Signals):
475
476            def __setitem__(self, sig, handler):
477                next_handlers[sig] = handler
478
479        with patch('celery.apps.worker.active_thread_count') as c:
480            c.return_value = 3
481            p, platforms.signals = platforms.signals, Signals()
482            try:
483                handlers['SIGINT']('SIGINT', object())
484                assert state.should_stop
485                assert state.should_stop == EX_FAILURE
486            finally:
487                platforms.signals = p
488                state.should_stop = None
489
490            try:
491                next_handlers['SIGINT']('SIGINT', object())
492                assert state.should_terminate
493                assert state.should_terminate == EX_FAILURE
494            finally:
495                state.should_terminate = None
496
497        with patch('celery.apps.worker.active_thread_count') as c:
498            c.return_value = 1
499            p, platforms.signals = platforms.signals, Signals()
500            try:
501                with pytest.raises(WorkerShutdown):
502                    handlers['SIGINT']('SIGINT', object())
503            finally:
504                platforms.signals = p
505
506            with pytest.raises(WorkerTerminate):
507                next_handlers['SIGINT']('SIGINT', object())
508
509    @skip.unless_module('multiprocessing')
510    def test_worker_int_handler_only_stop_MainProcess(self):
511        process = current_process()
512        name, process.name = process.name, 'OtherProcess'
513        with patch('celery.apps.worker.active_thread_count') as c:
514            c.return_value = 3
515            try:
516                worker = self._Worker()
517                handlers = self.psig(cd.install_worker_int_handler, worker)
518                handlers['SIGINT']('SIGINT', object())
519                assert state.should_stop
520            finally:
521                process.name = name
522                state.should_stop = None
523
524        with patch('celery.apps.worker.active_thread_count') as c:
525            c.return_value = 1
526            try:
527                worker = self._Worker()
528                handlers = self.psig(cd.install_worker_int_handler, worker)
529                with pytest.raises(WorkerShutdown):
530                    handlers['SIGINT']('SIGINT', object())
531            finally:
532                process.name = name
533                state.should_stop = None
534
535    def test_install_HUP_not_supported_handler(self):
536        worker = self._Worker()
537        handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
538        handlers['SIGHUP']('SIGHUP', object())
539
540    @skip.unless_module('multiprocessing')
541    def test_worker_term_hard_handler_only_stop_MainProcess(self):
542        process = current_process()
543        name, process.name = process.name, 'OtherProcess'
544        try:
545            with patch('celery.apps.worker.active_thread_count') as c:
546                c.return_value = 3
547                worker = self._Worker()
548                handlers = self.psig(
549                    cd.install_worker_term_hard_handler, worker)
550                try:
551                    handlers['SIGQUIT']('SIGQUIT', object())
552                    assert state.should_terminate
553                finally:
554                    state.should_terminate = None
555            with patch('celery.apps.worker.active_thread_count') as c:
556                c.return_value = 1
557                worker = self._Worker()
558                handlers = self.psig(
559                    cd.install_worker_term_hard_handler, worker)
560                try:
561                    with pytest.raises(WorkerTerminate):
562                        handlers['SIGQUIT']('SIGQUIT', object())
563                finally:
564                    state.should_terminate = None
565        finally:
566            process.name = name
567
568    def test_worker_term_handler_when_threads(self):
569        with patch('celery.apps.worker.active_thread_count') as c:
570            c.return_value = 3
571            worker = self._Worker()
572            handlers = self.psig(cd.install_worker_term_handler, worker)
573            try:
574                handlers['SIGTERM']('SIGTERM', object())
575                assert state.should_stop == EX_OK
576            finally:
577                state.should_stop = None
578
579    def test_worker_term_handler_when_single_thread(self):
580        with patch('celery.apps.worker.active_thread_count') as c:
581            c.return_value = 1
582            worker = self._Worker()
583            handlers = self.psig(cd.install_worker_term_handler, worker)
584            try:
585                with pytest.raises(WorkerShutdown):
586                    handlers['SIGTERM']('SIGTERM', object())
587            finally:
588                state.should_stop = None
589
590    @patch('sys.__stderr__')
591    @skip.if_pypy()
592    @skip.if_jython()
593    def test_worker_cry_handler(self, stderr):
594        handlers = self.psig(cd.install_cry_handler)
595        assert handlers['SIGUSR1']('SIGUSR1', object()) is None
596        stderr.write.assert_called()
597
598    @skip.unless_module('multiprocessing')
599    def test_worker_term_handler_only_stop_MainProcess(self):
600        process = current_process()
601        name, process.name = process.name, 'OtherProcess'
602        try:
603            with patch('celery.apps.worker.active_thread_count') as c:
604                c.return_value = 3
605                worker = self._Worker()
606                handlers = self.psig(cd.install_worker_term_handler, worker)
607                handlers['SIGTERM']('SIGTERM', object())
608                assert state.should_stop == EX_OK
609            with patch('celery.apps.worker.active_thread_count') as c:
610                c.return_value = 1
611                worker = self._Worker()
612                handlers = self.psig(cd.install_worker_term_handler, worker)
613                with pytest.raises(WorkerShutdown):
614                    handlers['SIGTERM']('SIGTERM', object())
615        finally:
616            process.name = name
617            state.should_stop = None
618
619    @skip.unless_symbol('os.execv')
620    @patch('celery.platforms.close_open_fds')
621    @patch('atexit.register')
622    @patch('os.close')
623    def test_worker_restart_handler(self, _close, register, close_open):
624        argv = []
625
626        def _execv(*args):
627            argv.extend(args)
628
629        execv, os.execv = os.execv, _execv
630        try:
631            worker = self._Worker()
632            handlers = self.psig(cd.install_worker_restart_handler, worker)
633            handlers['SIGHUP']('SIGHUP', object())
634            assert state.should_stop == EX_OK
635            register.assert_called()
636            callback = register.call_args[0][0]
637            callback()
638            assert argv
639        finally:
640            os.execv = execv
641            state.should_stop = None
642
643    def test_worker_term_hard_handler_when_threaded(self):
644        with patch('celery.apps.worker.active_thread_count') as c:
645            c.return_value = 3
646            worker = self._Worker()
647            handlers = self.psig(cd.install_worker_term_hard_handler, worker)
648            try:
649                handlers['SIGQUIT']('SIGQUIT', object())
650                assert state.should_terminate
651            finally:
652                state.should_terminate = None
653
654    def test_worker_term_hard_handler_when_single_threaded(self):
655        with patch('celery.apps.worker.active_thread_count') as c:
656            c.return_value = 1
657            worker = self._Worker()
658            handlers = self.psig(cd.install_worker_term_hard_handler, worker)
659            with pytest.raises(WorkerTerminate):
660                handlers['SIGQUIT']('SIGQUIT', object())
661
662    def test_send_worker_shutting_down_signal(self):
663        with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:
664            worker = self._Worker()
665            handlers = self.psig(cd.install_worker_term_handler, worker)
666            try:
667                with pytest.raises(WorkerShutdown):
668                    handlers['SIGTERM']('SIGTERM', object())
669            finally:
670                state.should_stop = None
671            wsd.send.assert_called_with(
672                sender='foo', sig='SIGTERM', how='Warm', exitcode=0,
673            )
674
675    @pytest.mark.xfail(
676        not hasattr(signal, "SIGQUIT"),
677        reason="Windows does not support SIGQUIT",
678        raises=AttributeError,
679    )
680    @patch.dict(os.environ, {"REMAP_SIGTERM": "SIGQUIT"})
681    def test_send_worker_shutting_down_signal_with_remap_sigquit(self):
682        with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:
683            from billiard import common
684
685            reload_module(common)
686            reload_module(cd)
687
688            worker = self._Worker()
689            handlers = self.psig(cd.install_worker_term_handler, worker)
690            try:
691                with pytest.raises(WorkerTerminate):
692                    handlers['SIGTERM']('SIGTERM', object())
693            finally:
694                state.should_stop = None
695            wsd.send.assert_called_with(
696                sender='foo', sig='SIGTERM', how='Cold', exitcode=1,
697            )
698