1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> 2# 3# This file is part of Ansible 4# 5# Ansible is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Ansible is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 17# Make coding more python3-ish 18from __future__ import (absolute_import, division, print_function) 19__metaclass__ = type 20 21DOCUMENTATION = ''' 22 name: linear 23 short_description: Executes tasks in a linear fashion 24 description: 25 - Task execution is in lockstep per host batch as defined by C(serial) (default all). 26 Up to the fork limit of hosts will execute each task at the same time and then 27 the next series of hosts until the batch is done, before going on to the next task. 28 version_added: "2.0" 29 notes: 30 - This was the default Ansible behaviour before 'strategy plugins' were introduced in 2.0. 31 author: Ansible Core Team 32''' 33 34from ansible import constants as C 35from ansible.errors import AnsibleError, AnsibleAssertionError 36from ansible.executor.play_iterator import PlayIterator 37from ansible.module_utils.six import iteritems 38from ansible.module_utils._text import to_text 39from ansible.playbook.block import Block 40from ansible.playbook.included_file import IncludedFile 41from ansible.playbook.task import Task 42from ansible.plugins.loader import action_loader 43from ansible.plugins.strategy import StrategyBase 44from ansible.template import Templar 45from ansible.utils.display import Display 46 47display = Display() 48 49 50class StrategyModule(StrategyBase): 51 52 noop_task = None 53 54 def _replace_with_noop(self, target): 55 if self.noop_task is None: 56 raise AnsibleAssertionError('strategy.linear.StrategyModule.noop_task is None, need Task()') 57 58 result = [] 59 for el in target: 60 if isinstance(el, Task): 61 result.append(self.noop_task) 62 elif isinstance(el, Block): 63 result.append(self._create_noop_block_from(el, el._parent)) 64 return result 65 66 def _create_noop_block_from(self, original_block, parent): 67 noop_block = Block(parent_block=parent) 68 noop_block.block = self._replace_with_noop(original_block.block) 69 noop_block.always = self._replace_with_noop(original_block.always) 70 noop_block.rescue = self._replace_with_noop(original_block.rescue) 71 72 return noop_block 73 74 def _prepare_and_create_noop_block_from(self, original_block, parent, iterator): 75 self.noop_task = Task() 76 self.noop_task.action = 'meta' 77 self.noop_task.args['_raw_params'] = 'noop' 78 self.noop_task.implicit = True 79 self.noop_task.set_loader(iterator._play._loader) 80 81 return self._create_noop_block_from(original_block, parent) 82 83 def _get_next_task_lockstep(self, hosts, iterator): 84 ''' 85 Returns a list of (host, task) tuples, where the task may 86 be a noop task to keep the iterator in lock step across 87 all hosts. 88 ''' 89 90 noop_task = Task() 91 noop_task.action = 'meta' 92 noop_task.args['_raw_params'] = 'noop' 93 noop_task.implicit = True 94 noop_task.set_loader(iterator._play._loader) 95 96 host_tasks = {} 97 display.debug("building list of next tasks for hosts") 98 for host in hosts: 99 host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True) 100 display.debug("done building task lists") 101 102 num_setups = 0 103 num_tasks = 0 104 num_rescue = 0 105 num_always = 0 106 107 display.debug("counting tasks in each state of execution") 108 host_tasks_to_run = [(host, state_task) 109 for host, state_task in iteritems(host_tasks) 110 if state_task and state_task[1]] 111 112 if host_tasks_to_run: 113 try: 114 lowest_cur_block = min( 115 (iterator.get_active_state(s).cur_block for h, (s, t) in host_tasks_to_run 116 if s.run_state != PlayIterator.ITERATING_COMPLETE)) 117 except ValueError: 118 lowest_cur_block = None 119 else: 120 # empty host_tasks_to_run will just run till the end of the function 121 # without ever touching lowest_cur_block 122 lowest_cur_block = None 123 124 for (k, v) in host_tasks_to_run: 125 (s, t) = v 126 127 s = iterator.get_active_state(s) 128 if s.cur_block > lowest_cur_block: 129 # Not the current block, ignore it 130 continue 131 132 if s.run_state == PlayIterator.ITERATING_SETUP: 133 num_setups += 1 134 elif s.run_state == PlayIterator.ITERATING_TASKS: 135 num_tasks += 1 136 elif s.run_state == PlayIterator.ITERATING_RESCUE: 137 num_rescue += 1 138 elif s.run_state == PlayIterator.ITERATING_ALWAYS: 139 num_always += 1 140 display.debug("done counting tasks in each state of execution:\n\tnum_setups: %s\n\tnum_tasks: %s\n\tnum_rescue: %s\n\tnum_always: %s" % (num_setups, 141 num_tasks, 142 num_rescue, 143 num_always)) 144 145 def _advance_selected_hosts(hosts, cur_block, cur_state): 146 ''' 147 This helper returns the task for all hosts in the requested 148 state, otherwise they get a noop dummy task. This also advances 149 the state of the host, since the given states are determined 150 while using peek=True. 151 ''' 152 # we return the values in the order they were originally 153 # specified in the given hosts array 154 rvals = [] 155 display.debug("starting to advance hosts") 156 for host in hosts: 157 host_state_task = host_tasks.get(host.name) 158 if host_state_task is None: 159 continue 160 (s, t) = host_state_task 161 s = iterator.get_active_state(s) 162 if t is None: 163 continue 164 if s.run_state == cur_state and s.cur_block == cur_block: 165 new_t = iterator.get_next_task_for_host(host) 166 rvals.append((host, t)) 167 else: 168 rvals.append((host, noop_task)) 169 display.debug("done advancing hosts to next task") 170 return rvals 171 172 # if any hosts are in ITERATING_SETUP, return the setup task 173 # while all other hosts get a noop 174 if num_setups: 175 display.debug("advancing hosts in ITERATING_SETUP") 176 return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_SETUP) 177 178 # if any hosts are in ITERATING_TASKS, return the next normal 179 # task for these hosts, while all other hosts get a noop 180 if num_tasks: 181 display.debug("advancing hosts in ITERATING_TASKS") 182 return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_TASKS) 183 184 # if any hosts are in ITERATING_RESCUE, return the next rescue 185 # task for these hosts, while all other hosts get a noop 186 if num_rescue: 187 display.debug("advancing hosts in ITERATING_RESCUE") 188 return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_RESCUE) 189 190 # if any hosts are in ITERATING_ALWAYS, return the next always 191 # task for these hosts, while all other hosts get a noop 192 if num_always: 193 display.debug("advancing hosts in ITERATING_ALWAYS") 194 return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_ALWAYS) 195 196 # at this point, everything must be ITERATING_COMPLETE, so we 197 # return None for all hosts in the list 198 display.debug("all hosts are done, so returning None's for all hosts") 199 return [(host, None) for host in hosts] 200 201 def run(self, iterator, play_context): 202 ''' 203 The linear strategy is simple - get the next task and queue 204 it for all hosts, then wait for the queue to drain before 205 moving on to the next task 206 ''' 207 208 # iterate over each task, while there is one left to run 209 result = self._tqm.RUN_OK 210 work_to_do = True 211 212 self._set_hosts_cache(iterator._play) 213 214 while work_to_do and not self._tqm._terminated: 215 216 try: 217 display.debug("getting the remaining hosts for this loop") 218 hosts_left = self.get_hosts_left(iterator) 219 display.debug("done getting the remaining hosts for this loop") 220 221 # queue up this task for each host in the inventory 222 callback_sent = False 223 work_to_do = False 224 225 host_results = [] 226 host_tasks = self._get_next_task_lockstep(hosts_left, iterator) 227 228 # skip control 229 skip_rest = False 230 choose_step = True 231 232 # flag set if task is set to any_errors_fatal 233 any_errors_fatal = False 234 235 results = [] 236 for (host, task) in host_tasks: 237 if not task: 238 continue 239 240 if self._tqm._terminated: 241 break 242 243 run_once = False 244 work_to_do = True 245 246 # check to see if this task should be skipped, due to it being a member of a 247 # role which has already run (and whether that role allows duplicate execution) 248 if task._role and task._role.has_run(host): 249 # If there is no metadata, the default behavior is to not allow duplicates, 250 # if there is metadata, check to see if the allow_duplicates flag was set to true 251 if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: 252 display.debug("'%s' skipped because role has already run" % task) 253 continue 254 255 display.debug("getting variables") 256 task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, 257 _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) 258 self.add_tqm_variables(task_vars, play=iterator._play) 259 templar = Templar(loader=self._loader, variables=task_vars) 260 display.debug("done getting variables") 261 262 # test to see if the task across all hosts points to an action plugin which 263 # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we 264 # will only send this task to the first host in the list. 265 266 task_action = templar.template(task.action) 267 268 try: 269 action = action_loader.get(task_action, class_only=True, collection_list=task.collections) 270 except KeyError: 271 # we don't care here, because the action may simply not have a 272 # corresponding action plugin 273 action = None 274 275 if task_action in C._ACTION_META: 276 # for the linear strategy, we run meta tasks just once and for 277 # all hosts currently being iterated over rather than one host 278 results.extend(self._execute_meta(task, play_context, iterator, host)) 279 if task.args.get('_raw_params', None) not in ('noop', 'reset_connection', 'end_host', 'role_complete'): 280 run_once = True 281 if (task.any_errors_fatal or run_once) and not task.ignore_errors: 282 any_errors_fatal = True 283 else: 284 # handle step if needed, skip meta actions as they are used internally 285 if self._step and choose_step: 286 if self._take_step(task): 287 choose_step = False 288 else: 289 skip_rest = True 290 break 291 292 run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False) 293 294 if (task.any_errors_fatal or run_once) and not task.ignore_errors: 295 any_errors_fatal = True 296 297 if not callback_sent: 298 display.debug("sending task start callback, copying the task so we can template it temporarily") 299 saved_name = task.name 300 display.debug("done copying, going to template now") 301 try: 302 task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') 303 display.debug("done templating") 304 except Exception: 305 # just ignore any errors during task name templating, 306 # we don't care if it just shows the raw name 307 display.debug("templating failed for some reason") 308 display.debug("here goes the callback...") 309 self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) 310 task.name = saved_name 311 callback_sent = True 312 display.debug("sending task start callback") 313 314 self._blocked_hosts[host.get_name()] = True 315 self._queue_task(host, task, task_vars, play_context) 316 del task_vars 317 318 # if we're bypassing the host loop, break out now 319 if run_once: 320 break 321 322 results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1))) 323 324 # go to next host/task group 325 if skip_rest: 326 continue 327 328 display.debug("done queuing things up, now waiting for results queue to drain") 329 if self._pending_results > 0: 330 results += self._wait_on_pending_results(iterator) 331 332 host_results.extend(results) 333 334 self.update_active_connections(results) 335 336 included_files = IncludedFile.process_include_results( 337 host_results, 338 iterator=iterator, 339 loader=self._loader, 340 variable_manager=self._variable_manager 341 ) 342 343 include_failure = False 344 if len(included_files) > 0: 345 display.debug("we have included files to process") 346 347 display.debug("generating all_blocks data") 348 all_blocks = dict((host, []) for host in hosts_left) 349 display.debug("done generating all_blocks data") 350 for included_file in included_files: 351 display.debug("processing included file: %s" % included_file._filename) 352 # included hosts get the task list while those excluded get an equal-length 353 # list of noop tasks, to make sure that they continue running in lock-step 354 try: 355 if included_file._is_role: 356 new_ir = self._copy_included_file(included_file) 357 358 new_blocks, handler_blocks = new_ir.get_block_list( 359 play=iterator._play, 360 variable_manager=self._variable_manager, 361 loader=self._loader, 362 ) 363 else: 364 new_blocks = self._load_included_file(included_file, iterator=iterator) 365 366 display.debug("iterating over new_blocks loaded from include file") 367 for new_block in new_blocks: 368 task_vars = self._variable_manager.get_vars( 369 play=iterator._play, 370 task=new_block.get_first_parent_include(), 371 _hosts=self._hosts_cache, 372 _hosts_all=self._hosts_cache_all, 373 ) 374 display.debug("filtering new block on tags") 375 final_block = new_block.filter_tagged_tasks(task_vars) 376 display.debug("done filtering new block on tags") 377 378 noop_block = self._prepare_and_create_noop_block_from(final_block, task._parent, iterator) 379 380 for host in hosts_left: 381 if host in included_file._hosts: 382 all_blocks[host].append(final_block) 383 else: 384 all_blocks[host].append(noop_block) 385 display.debug("done iterating over new_blocks loaded from include file") 386 387 except AnsibleError as e: 388 for host in included_file._hosts: 389 self._tqm._failed_hosts[host.name] = True 390 iterator.mark_host_failed(host) 391 display.error(to_text(e), wrap_text=False) 392 include_failure = True 393 continue 394 395 # finally go through all of the hosts and append the 396 # accumulated blocks to their list of tasks 397 display.debug("extending task lists for all hosts with included blocks") 398 399 for host in hosts_left: 400 iterator.add_tasks(host, all_blocks[host]) 401 402 display.debug("done extending task lists") 403 display.debug("done processing included files") 404 405 display.debug("results queue empty") 406 407 display.debug("checking for any_errors_fatal") 408 failed_hosts = [] 409 unreachable_hosts = [] 410 for res in results: 411 # execute_meta() does not set 'failed' in the TaskResult 412 # so we skip checking it with the meta tasks and look just at the iterator 413 if (res.is_failed() or res._task.action in C._ACTION_META) and iterator.is_failed(res._host): 414 failed_hosts.append(res._host.name) 415 elif res.is_unreachable(): 416 unreachable_hosts.append(res._host.name) 417 418 # if any_errors_fatal and we had an error, mark all hosts as failed 419 if any_errors_fatal and (len(failed_hosts) > 0 or len(unreachable_hosts) > 0): 420 dont_fail_states = frozenset([iterator.ITERATING_RESCUE, iterator.ITERATING_ALWAYS]) 421 for host in hosts_left: 422 (s, _) = iterator.get_next_task_for_host(host, peek=True) 423 # the state may actually be in a child state, use the get_active_state() 424 # method in the iterator to figure out the true active state 425 s = iterator.get_active_state(s) 426 if s.run_state not in dont_fail_states or \ 427 s.run_state == iterator.ITERATING_RESCUE and s.fail_state & iterator.FAILED_RESCUE != 0: 428 self._tqm._failed_hosts[host.name] = True 429 result |= self._tqm.RUN_FAILED_BREAK_PLAY 430 display.debug("done checking for any_errors_fatal") 431 432 display.debug("checking for max_fail_percentage") 433 if iterator._play.max_fail_percentage is not None and len(results) > 0: 434 percentage = iterator._play.max_fail_percentage / 100.0 435 436 if (len(self._tqm._failed_hosts) / iterator.batch_size) > percentage: 437 for host in hosts_left: 438 # don't double-mark hosts, or the iterator will potentially 439 # fail them out of the rescue/always states 440 if host.name not in failed_hosts: 441 self._tqm._failed_hosts[host.name] = True 442 iterator.mark_host_failed(host) 443 self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') 444 result |= self._tqm.RUN_FAILED_BREAK_PLAY 445 display.debug('(%s failed / %s total )> %s max fail' % (len(self._tqm._failed_hosts), iterator.batch_size, percentage)) 446 display.debug("done checking for max_fail_percentage") 447 448 display.debug("checking to see if all hosts have failed and the running result is not ok") 449 if result != self._tqm.RUN_OK and len(self._tqm._failed_hosts) >= len(hosts_left): 450 display.debug("^ not ok, so returning result now") 451 self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') 452 return result 453 display.debug("done checking to see if all hosts have failed") 454 455 except (IOError, EOFError) as e: 456 display.debug("got IOError/EOFError in task loop: %s" % e) 457 # most likely an abort, return failed 458 return self._tqm.RUN_UNKNOWN_ERROR 459 460 # run the base class run() method, which executes the cleanup function 461 # and runs any outstanding handlers which have been triggered 462 463 return super(StrategyModule, self).run(iterator, play_context, result) 464