1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> 2# 3# This file is part of Ansible 4# 5# Ansible is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Ansible is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 17 18# Make coding more python3-ish 19from __future__ import (absolute_import, division, print_function) 20__metaclass__ = type 21 22import os 23import sys 24import tempfile 25import time 26 27from ansible import constants as C 28from ansible import context 29from ansible.errors import AnsibleError 30from ansible.executor.play_iterator import PlayIterator 31from ansible.executor.stats import AggregateStats 32from ansible.executor.task_result import TaskResult 33from ansible.module_utils.six import string_types 34from ansible.module_utils._text import to_text, to_native 35from ansible.playbook.block import Block 36from ansible.playbook.play_context import PlayContext 37from ansible.plugins.loader import callback_loader, strategy_loader, module_loader 38from ansible.plugins.callback import CallbackBase 39from ansible.template import Templar 40from ansible.utils.collection_loader import AnsibleCollectionRef 41from ansible.utils.helpers import pct_to_int 42from ansible.vars.hostvars import HostVars 43from ansible.vars.reserved import warn_if_reserved 44from ansible.utils.display import Display 45from ansible.utils.multiprocessing import context as multiprocessing_context 46 47 48__all__ = ['TaskQueueManager'] 49 50display = Display() 51 52 53class TaskQueueManager: 54 55 ''' 56 This class handles the multiprocessing requirements of Ansible by 57 creating a pool of worker forks, a result handler fork, and a 58 manager object with shared datastructures/queues for coordinating 59 work between all processes. 60 61 The queue manager is responsible for loading the play strategy plugin, 62 which dispatches the Play's tasks to hosts. 63 ''' 64 65 RUN_OK = 0 66 RUN_ERROR = 1 67 RUN_FAILED_HOSTS = 2 68 RUN_UNREACHABLE_HOSTS = 4 69 RUN_FAILED_BREAK_PLAY = 8 70 RUN_UNKNOWN_ERROR = 255 71 72 def __init__(self, inventory, variable_manager, loader, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None): 73 74 self._inventory = inventory 75 self._variable_manager = variable_manager 76 self._loader = loader 77 self._stats = AggregateStats() 78 self.passwords = passwords 79 self._stdout_callback = stdout_callback 80 self._run_additional_callbacks = run_additional_callbacks 81 self._run_tree = run_tree 82 self._forks = forks or 5 83 84 self._callbacks_loaded = False 85 self._callback_plugins = [] 86 self._start_at_done = False 87 88 # make sure any module paths (if specified) are added to the module_loader 89 if context.CLIARGS.get('module_path', False): 90 for path in context.CLIARGS['module_path']: 91 if path: 92 module_loader.add_directory(path) 93 94 # a special flag to help us exit cleanly 95 self._terminated = False 96 97 # dictionaries to keep track of failed/unreachable hosts 98 self._failed_hosts = dict() 99 self._unreachable_hosts = dict() 100 101 try: 102 self._final_q = multiprocessing_context.Queue() 103 except OSError as e: 104 raise AnsibleError("Unable to use multiprocessing, this is normally caused by lack of access to /dev/shm: %s" % to_native(e)) 105 106 # A temporary file (opened pre-fork) used by connection 107 # plugins for inter-process locking. 108 self._connection_lockfile = tempfile.TemporaryFile() 109 110 def _initialize_processes(self, num): 111 self._workers = [] 112 113 for i in range(num): 114 self._workers.append(None) 115 116 def load_callbacks(self): 117 ''' 118 Loads all available callbacks, with the exception of those which 119 utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout', 120 only one such callback plugin will be loaded. 121 ''' 122 123 if self._callbacks_loaded: 124 return 125 126 stdout_callback_loaded = False 127 if self._stdout_callback is None: 128 self._stdout_callback = C.DEFAULT_STDOUT_CALLBACK 129 130 if isinstance(self._stdout_callback, CallbackBase): 131 stdout_callback_loaded = True 132 elif isinstance(self._stdout_callback, string_types): 133 if self._stdout_callback not in callback_loader: 134 raise AnsibleError("Invalid callback for stdout specified: %s" % self._stdout_callback) 135 else: 136 self._stdout_callback = callback_loader.get(self._stdout_callback) 137 self._stdout_callback.set_options() 138 stdout_callback_loaded = True 139 else: 140 raise AnsibleError("callback must be an instance of CallbackBase or the name of a callback plugin") 141 142 # get all configured loadable callbacks (adjacent, builtin) 143 callback_list = list(callback_loader.all(class_only=True)) 144 145 # add whitelisted callbacks that refer to collections, which might not appear in normal listing 146 for c in C.DEFAULT_CALLBACK_WHITELIST: 147 # load all, as collection ones might be using short/redirected names and not a fqcn 148 plugin = callback_loader.get(c, class_only=True) 149 150 # TODO: check if this skip is redundant, loader should handle bad file/plugin cases already 151 if plugin: 152 # avoids incorrect and dupes possible due to collections 153 if plugin not in callback_list: 154 setattr(plugin, '_redirected_names', [c]) # here for backport as newer versions of plugin_loader already do this 155 callback_list.append(plugin) 156 else: 157 display.warning("Skipping callback plugin '%s', unable to load" % c) 158 159 # for each callback in the list see if we should add it to 'active callbacks' used in the play 160 for callback_plugin in callback_list: 161 162 callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', '') 163 callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False) 164 165 # try to get colleciotn world name first 166 cnames = getattr(callback_plugin, '_redirected_names', []) 167 if cnames: 168 # store the name the plugin was loaded as, as that's what we'll need to compare to the configured callback list later 169 callback_name = cnames[0] 170 else: 171 # fallback to 'old loader name' 172 (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) 173 174 display.vvvvv("Attempting to use '%s' callback." % (callback_name)) 175 if callback_type == 'stdout': 176 # we only allow one callback of type 'stdout' to be loaded, 177 if callback_name != self._stdout_callback or stdout_callback_loaded: 178 display.vv("Skipping callback '%s', as we already have a stdout callback." % (callback_name)) 179 continue 180 stdout_callback_loaded = True 181 elif callback_name == 'tree' and self._run_tree: 182 # TODO: remove special case for tree, which is an adhoc cli option --tree 183 pass 184 elif not self._run_additional_callbacks or (callback_needs_whitelist and ( 185 # only run if not adhoc, or adhoc was specifically configured to run + check enabled list 186 C.DEFAULT_CALLBACK_WHITELIST is None or callback_name not in C.DEFAULT_CALLBACK_WHITELIST)): 187 # 2.x plugins shipped with ansible should require whitelisting, older or non shipped should load automatically 188 continue 189 190 try: 191 callback_obj = callback_plugin() 192 # avoid bad plugin not returning an object, only needed cause we do class_only load and bypass loader checks, 193 # really a bug in the plugin itself which we ignore as callback errors are not supposed to be fatal. 194 if callback_obj: 195 # skip initializing if we already did the work for the same plugin (even with diff names) 196 if callback_obj not in self._callback_plugins: 197 callback_obj.set_options() 198 self._callback_plugins.append(callback_obj) 199 else: 200 display.vv("Skipping callback '%s', already loaded as '%s'." % (callback_plugin, callback_name)) 201 else: 202 display.warning("Skipping callback '%s', as it does not create a valid plugin instance." % callback_name) 203 continue 204 except Exception as e: 205 display.warning("Skipping callback '%s', unable to load due to: %s" % (callback_name, to_native(e))) 206 continue 207 208 self._callbacks_loaded = True 209 210 def run(self, play): 211 ''' 212 Iterates over the roles/tasks in a play, using the given (or default) 213 strategy for queueing tasks. The default is the linear strategy, which 214 operates like classic Ansible by keeping all hosts in lock-step with 215 a given task (meaning no hosts move on to the next task until all hosts 216 are done with the current task). 217 ''' 218 219 if not self._callbacks_loaded: 220 self.load_callbacks() 221 222 all_vars = self._variable_manager.get_vars(play=play) 223 warn_if_reserved(all_vars) 224 templar = Templar(loader=self._loader, variables=all_vars) 225 226 new_play = play.copy() 227 new_play.post_validate(templar) 228 new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers 229 230 self.hostvars = HostVars( 231 inventory=self._inventory, 232 variable_manager=self._variable_manager, 233 loader=self._loader, 234 ) 235 236 play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno()) 237 if (self._stdout_callback and 238 hasattr(self._stdout_callback, 'set_play_context')): 239 self._stdout_callback.set_play_context(play_context) 240 241 for callback_plugin in self._callback_plugins: 242 if hasattr(callback_plugin, 'set_play_context'): 243 callback_plugin.set_play_context(play_context) 244 245 self.send_callback('v2_playbook_on_play_start', new_play) 246 247 # build the iterator 248 iterator = PlayIterator( 249 inventory=self._inventory, 250 play=new_play, 251 play_context=play_context, 252 variable_manager=self._variable_manager, 253 all_vars=all_vars, 254 start_at_done=self._start_at_done, 255 ) 256 257 # adjust to # of workers to configured forks or size of batch, whatever is lower 258 self._initialize_processes(min(self._forks, iterator.batch_size)) 259 260 # load the specified strategy (or the default linear one) 261 strategy = strategy_loader.get(new_play.strategy, self) 262 if strategy is None: 263 raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) 264 265 # Because the TQM may survive multiple play runs, we start by marking 266 # any hosts as failed in the iterator here which may have been marked 267 # as failed in previous runs. Then we clear the internal list of failed 268 # hosts so we know what failed this round. 269 for host_name in self._failed_hosts.keys(): 270 host = self._inventory.get_host(host_name) 271 iterator.mark_host_failed(host) 272 273 self.clear_failed_hosts() 274 275 # during initialization, the PlayContext will clear the start_at_task 276 # field to signal that a matching task was found, so check that here 277 # and remember it so we don't try to skip tasks on future plays 278 if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None: 279 self._start_at_done = True 280 281 # and run the play using the strategy and cleanup on way out 282 play_return = strategy.run(iterator, play_context) 283 284 # now re-save the hosts that failed from the iterator to our internal list 285 for host_name in iterator.get_failed_hosts(): 286 self._failed_hosts[host_name] = True 287 288 strategy.cleanup() 289 self._cleanup_processes() 290 return play_return 291 292 def cleanup(self): 293 display.debug("RUNNING CLEANUP") 294 self.terminate() 295 self._final_q.close() 296 self._cleanup_processes() 297 298 # A bug exists in Python 2.6 that causes an exception to be raised during 299 # interpreter shutdown. This is only an issue in our CI testing but we 300 # hit it frequently enough to add a small sleep to avoid the issue. 301 # This can be removed once we have split controller available in CI. 302 # 303 # Further information: 304 # Issue: https://bugs.python.org/issue4106 305 # Fix: https://hg.python.org/cpython/rev/d316315a8781 306 # 307 try: 308 if (2, 6) == (sys.version_info[0:2]): 309 time.sleep(0.0001) 310 except (IndexError, AttributeError): 311 # In case there is an issue getting the version info, don't raise an Exception 312 pass 313 314 def _cleanup_processes(self): 315 if hasattr(self, '_workers'): 316 for worker_prc in self._workers: 317 if worker_prc and worker_prc.is_alive(): 318 try: 319 worker_prc.terminate() 320 except AttributeError: 321 pass 322 323 def clear_failed_hosts(self): 324 self._failed_hosts = dict() 325 326 def get_inventory(self): 327 return self._inventory 328 329 def get_variable_manager(self): 330 return self._variable_manager 331 332 def get_loader(self): 333 return self._loader 334 335 def get_workers(self): 336 return self._workers[:] 337 338 def terminate(self): 339 self._terminated = True 340 341 def has_dead_workers(self): 342 343 # [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>, 344 # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])> 345 346 defunct = False 347 for x in self._workers: 348 if getattr(x, 'exitcode', None): 349 defunct = True 350 return defunct 351 352 def send_callback(self, method_name, *args, **kwargs): 353 for callback_plugin in [self._stdout_callback] + self._callback_plugins: 354 # a plugin that set self.disabled to True will not be called 355 # see osx_say.py example for such a plugin 356 if getattr(callback_plugin, 'disabled', False): 357 continue 358 359 # try to find v2 method, fallback to v1 method, ignore callback if no method found 360 methods = [] 361 for possible in [method_name, 'v2_on_any']: 362 gotit = getattr(callback_plugin, possible, None) 363 if gotit is None: 364 gotit = getattr(callback_plugin, possible.replace('v2_', ''), None) 365 if gotit is not None: 366 methods.append(gotit) 367 368 # send clean copies 369 new_args = [] 370 for arg in args: 371 # FIXME: add play/task cleaners 372 if isinstance(arg, TaskResult): 373 new_args.append(arg.clean_copy()) 374 # elif isinstance(arg, Play): 375 # elif isinstance(arg, Task): 376 else: 377 new_args.append(arg) 378 379 for method in methods: 380 try: 381 method(*new_args, **kwargs) 382 except Exception as e: 383 # TODO: add config toggle to make this fatal or not? 384 display.warning(u"Failure using method (%s) in callback plugin (%s): %s" % (to_text(method_name), to_text(callback_plugin), to_text(e))) 385 from traceback import format_tb 386 from sys import exc_info 387 display.vvv('Callback Exception: \n' + ' '.join(format_tb(exc_info()[2]))) 388