1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
2#
3# This file is part of Ansible
4#
5# Ansible is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Ansible is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
17
18# Make coding more python3-ish
19from __future__ import (absolute_import, division, print_function)
20__metaclass__ = type
21
22import os
23import sys
24import tempfile
25import time
26
27from ansible import constants as C
28from ansible import context
29from ansible.errors import AnsibleError
30from ansible.executor.play_iterator import PlayIterator
31from ansible.executor.stats import AggregateStats
32from ansible.executor.task_result import TaskResult
33from ansible.module_utils.six import string_types
34from ansible.module_utils._text import to_text, to_native
35from ansible.playbook.block import Block
36from ansible.playbook.play_context import PlayContext
37from ansible.plugins.loader import callback_loader, strategy_loader, module_loader
38from ansible.plugins.callback import CallbackBase
39from ansible.template import Templar
40from ansible.utils.collection_loader import AnsibleCollectionRef
41from ansible.utils.helpers import pct_to_int
42from ansible.vars.hostvars import HostVars
43from ansible.vars.reserved import warn_if_reserved
44from ansible.utils.display import Display
45from ansible.utils.multiprocessing import context as multiprocessing_context
46
47
48__all__ = ['TaskQueueManager']
49
50display = Display()
51
52
53class TaskQueueManager:
54
55    '''
56    This class handles the multiprocessing requirements of Ansible by
57    creating a pool of worker forks, a result handler fork, and a
58    manager object with shared datastructures/queues for coordinating
59    work between all processes.
60
61    The queue manager is responsible for loading the play strategy plugin,
62    which dispatches the Play's tasks to hosts.
63    '''
64
65    RUN_OK = 0
66    RUN_ERROR = 1
67    RUN_FAILED_HOSTS = 2
68    RUN_UNREACHABLE_HOSTS = 4
69    RUN_FAILED_BREAK_PLAY = 8
70    RUN_UNKNOWN_ERROR = 255
71
72    def __init__(self, inventory, variable_manager, loader, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None):
73
74        self._inventory = inventory
75        self._variable_manager = variable_manager
76        self._loader = loader
77        self._stats = AggregateStats()
78        self.passwords = passwords
79        self._stdout_callback = stdout_callback
80        self._run_additional_callbacks = run_additional_callbacks
81        self._run_tree = run_tree
82        self._forks = forks or 5
83
84        self._callbacks_loaded = False
85        self._callback_plugins = []
86        self._start_at_done = False
87
88        # make sure any module paths (if specified) are added to the module_loader
89        if context.CLIARGS.get('module_path', False):
90            for path in context.CLIARGS['module_path']:
91                if path:
92                    module_loader.add_directory(path)
93
94        # a special flag to help us exit cleanly
95        self._terminated = False
96
97        # dictionaries to keep track of failed/unreachable hosts
98        self._failed_hosts = dict()
99        self._unreachable_hosts = dict()
100
101        try:
102            self._final_q = multiprocessing_context.Queue()
103        except OSError as e:
104            raise AnsibleError("Unable to use multiprocessing, this is normally caused by lack of access to /dev/shm: %s" % to_native(e))
105
106        # A temporary file (opened pre-fork) used by connection
107        # plugins for inter-process locking.
108        self._connection_lockfile = tempfile.TemporaryFile()
109
110    def _initialize_processes(self, num):
111        self._workers = []
112
113        for i in range(num):
114            self._workers.append(None)
115
116    def load_callbacks(self):
117        '''
118        Loads all available callbacks, with the exception of those which
119        utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout',
120        only one such callback plugin will be loaded.
121        '''
122
123        if self._callbacks_loaded:
124            return
125
126        stdout_callback_loaded = False
127        if self._stdout_callback is None:
128            self._stdout_callback = C.DEFAULT_STDOUT_CALLBACK
129
130        if isinstance(self._stdout_callback, CallbackBase):
131            stdout_callback_loaded = True
132        elif isinstance(self._stdout_callback, string_types):
133            if self._stdout_callback not in callback_loader:
134                raise AnsibleError("Invalid callback for stdout specified: %s" % self._stdout_callback)
135            else:
136                self._stdout_callback = callback_loader.get(self._stdout_callback)
137                self._stdout_callback.set_options()
138                stdout_callback_loaded = True
139        else:
140            raise AnsibleError("callback must be an instance of CallbackBase or the name of a callback plugin")
141
142        # get all configured loadable callbacks (adjacent, builtin)
143        callback_list = list(callback_loader.all(class_only=True))
144
145        # add whitelisted callbacks that refer to collections, which might not appear in normal listing
146        for c in C.DEFAULT_CALLBACK_WHITELIST:
147            # load all, as collection ones might be using short/redirected names and not a fqcn
148            plugin = callback_loader.get(c, class_only=True)
149
150            # TODO: check if this skip is redundant, loader should handle bad file/plugin cases already
151            if plugin:
152                # avoids incorrect and dupes possible due to collections
153                if plugin not in callback_list:
154                    setattr(plugin, '_redirected_names', [c])  # here for backport as newer versions of plugin_loader already do this
155                    callback_list.append(plugin)
156            else:
157                display.warning("Skipping callback plugin '%s', unable to load" % c)
158
159        # for each callback in the list see if we should add it to 'active callbacks' used in the play
160        for callback_plugin in callback_list:
161
162            callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', '')
163            callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False)
164
165            # try to get colleciotn world name first
166            cnames = getattr(callback_plugin, '_redirected_names', [])
167            if cnames:
168                # store the name the plugin was loaded as, as that's what we'll need to compare to the configured callback list later
169                callback_name = cnames[0]
170            else:
171                # fallback to 'old loader name'
172                (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
173
174            display.vvvvv("Attempting to use '%s' callback." % (callback_name))
175            if callback_type == 'stdout':
176                # we only allow one callback of type 'stdout' to be loaded,
177                if callback_name != self._stdout_callback or stdout_callback_loaded:
178                    display.vv("Skipping callback '%s', as we already have a stdout callback." % (callback_name))
179                    continue
180                stdout_callback_loaded = True
181            elif callback_name == 'tree' and self._run_tree:
182                # TODO: remove special case for tree, which is an adhoc cli option --tree
183                pass
184            elif not self._run_additional_callbacks or (callback_needs_whitelist and (
185                # only run if not adhoc, or adhoc was specifically configured to run + check enabled list
186                    C.DEFAULT_CALLBACK_WHITELIST is None or callback_name not in C.DEFAULT_CALLBACK_WHITELIST)):
187                # 2.x plugins shipped with ansible should require whitelisting, older or non shipped should load automatically
188                continue
189
190            try:
191                callback_obj = callback_plugin()
192                # avoid bad plugin not returning an object, only needed cause we do class_only load and bypass loader checks,
193                # really a bug in the plugin itself which we ignore as callback errors are not supposed to be fatal.
194                if callback_obj:
195                    # skip initializing if we already did the work for the same plugin (even with diff names)
196                    if callback_obj not in self._callback_plugins:
197                        callback_obj.set_options()
198                        self._callback_plugins.append(callback_obj)
199                    else:
200                        display.vv("Skipping callback '%s', already loaded as '%s'." % (callback_plugin, callback_name))
201                else:
202                    display.warning("Skipping callback '%s', as it does not create a valid plugin instance." % callback_name)
203                    continue
204            except Exception as e:
205                display.warning("Skipping callback '%s', unable to load due to: %s" % (callback_name, to_native(e)))
206                continue
207
208        self._callbacks_loaded = True
209
210    def run(self, play):
211        '''
212        Iterates over the roles/tasks in a play, using the given (or default)
213        strategy for queueing tasks. The default is the linear strategy, which
214        operates like classic Ansible by keeping all hosts in lock-step with
215        a given task (meaning no hosts move on to the next task until all hosts
216        are done with the current task).
217        '''
218
219        if not self._callbacks_loaded:
220            self.load_callbacks()
221
222        all_vars = self._variable_manager.get_vars(play=play)
223        warn_if_reserved(all_vars)
224        templar = Templar(loader=self._loader, variables=all_vars)
225
226        new_play = play.copy()
227        new_play.post_validate(templar)
228        new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers
229
230        self.hostvars = HostVars(
231            inventory=self._inventory,
232            variable_manager=self._variable_manager,
233            loader=self._loader,
234        )
235
236        play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno())
237        if (self._stdout_callback and
238                hasattr(self._stdout_callback, 'set_play_context')):
239            self._stdout_callback.set_play_context(play_context)
240
241        for callback_plugin in self._callback_plugins:
242            if hasattr(callback_plugin, 'set_play_context'):
243                callback_plugin.set_play_context(play_context)
244
245        self.send_callback('v2_playbook_on_play_start', new_play)
246
247        # build the iterator
248        iterator = PlayIterator(
249            inventory=self._inventory,
250            play=new_play,
251            play_context=play_context,
252            variable_manager=self._variable_manager,
253            all_vars=all_vars,
254            start_at_done=self._start_at_done,
255        )
256
257        # adjust to # of workers to configured forks or size of batch, whatever is lower
258        self._initialize_processes(min(self._forks, iterator.batch_size))
259
260        # load the specified strategy (or the default linear one)
261        strategy = strategy_loader.get(new_play.strategy, self)
262        if strategy is None:
263            raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
264
265        # Because the TQM may survive multiple play runs, we start by marking
266        # any hosts as failed in the iterator here which may have been marked
267        # as failed in previous runs. Then we clear the internal list of failed
268        # hosts so we know what failed this round.
269        for host_name in self._failed_hosts.keys():
270            host = self._inventory.get_host(host_name)
271            iterator.mark_host_failed(host)
272
273        self.clear_failed_hosts()
274
275        # during initialization, the PlayContext will clear the start_at_task
276        # field to signal that a matching task was found, so check that here
277        # and remember it so we don't try to skip tasks on future plays
278        if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None:
279            self._start_at_done = True
280
281        # and run the play using the strategy and cleanup on way out
282        play_return = strategy.run(iterator, play_context)
283
284        # now re-save the hosts that failed from the iterator to our internal list
285        for host_name in iterator.get_failed_hosts():
286            self._failed_hosts[host_name] = True
287
288        strategy.cleanup()
289        self._cleanup_processes()
290        return play_return
291
292    def cleanup(self):
293        display.debug("RUNNING CLEANUP")
294        self.terminate()
295        self._final_q.close()
296        self._cleanup_processes()
297
298        # A bug exists in Python 2.6 that causes an exception to be raised during
299        # interpreter shutdown. This is only an issue in our CI testing but we
300        # hit it frequently enough to add a small sleep to avoid the issue.
301        # This can be removed once we have split controller available in CI.
302        #
303        # Further information:
304        #     Issue: https://bugs.python.org/issue4106
305        #     Fix:   https://hg.python.org/cpython/rev/d316315a8781
306        #
307        try:
308            if (2, 6) == (sys.version_info[0:2]):
309                time.sleep(0.0001)
310        except (IndexError, AttributeError):
311            # In case there is an issue getting the version info, don't raise an Exception
312            pass
313
314    def _cleanup_processes(self):
315        if hasattr(self, '_workers'):
316            for worker_prc in self._workers:
317                if worker_prc and worker_prc.is_alive():
318                    try:
319                        worker_prc.terminate()
320                    except AttributeError:
321                        pass
322
323    def clear_failed_hosts(self):
324        self._failed_hosts = dict()
325
326    def get_inventory(self):
327        return self._inventory
328
329    def get_variable_manager(self):
330        return self._variable_manager
331
332    def get_loader(self):
333        return self._loader
334
335    def get_workers(self):
336        return self._workers[:]
337
338    def terminate(self):
339        self._terminated = True
340
341    def has_dead_workers(self):
342
343        # [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>,
344        # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])>
345
346        defunct = False
347        for x in self._workers:
348            if getattr(x, 'exitcode', None):
349                defunct = True
350        return defunct
351
352    def send_callback(self, method_name, *args, **kwargs):
353        for callback_plugin in [self._stdout_callback] + self._callback_plugins:
354            # a plugin that set self.disabled to True will not be called
355            # see osx_say.py example for such a plugin
356            if getattr(callback_plugin, 'disabled', False):
357                continue
358
359            # try to find v2 method, fallback to v1 method, ignore callback if no method found
360            methods = []
361            for possible in [method_name, 'v2_on_any']:
362                gotit = getattr(callback_plugin, possible, None)
363                if gotit is None:
364                    gotit = getattr(callback_plugin, possible.replace('v2_', ''), None)
365                if gotit is not None:
366                    methods.append(gotit)
367
368            # send clean copies
369            new_args = []
370            for arg in args:
371                # FIXME: add play/task cleaners
372                if isinstance(arg, TaskResult):
373                    new_args.append(arg.clean_copy())
374                # elif isinstance(arg, Play):
375                # elif isinstance(arg, Task):
376                else:
377                    new_args.append(arg)
378
379            for method in methods:
380                try:
381                    method(*new_args, **kwargs)
382                except Exception as e:
383                    # TODO: add config toggle to make this fatal or not?
384                    display.warning(u"Failure using method (%s) in callback plugin (%s): %s" % (to_text(method_name), to_text(callback_plugin), to_text(e)))
385                    from traceback import format_tb
386                    from sys import exc_info
387                    display.vvv('Callback Exception: \n' + ' '.join(format_tb(exc_info()[2])))
388