1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> 2# 3# This file is part of Ansible 4# 5# Ansible is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Ansible is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 17# Make coding more python3-ish 18from __future__ import (absolute_import, division, print_function) 19__metaclass__ = type 20 21DOCUMENTATION = ''' 22 name: free 23 short_description: Executes tasks without waiting for all hosts 24 description: 25 - Task execution is as fast as possible per batch as defined by C(serial) (default all). 26 Ansible will not wait for other hosts to finish the current task before queuing more tasks for other hosts. 27 All hosts are still attempted for the current task, but it prevents blocking new tasks for hosts that have already finished. 28 - With the free strategy, unlike the default linear strategy, a host that is slow or stuck on a specific task 29 won't hold up the rest of the hosts and tasks. 30 version_added: "2.0" 31 author: Ansible Core Team 32''' 33 34import time 35 36from ansible import constants as C 37from ansible.errors import AnsibleError 38from ansible.playbook.included_file import IncludedFile 39from ansible.plugins.loader import action_loader 40from ansible.plugins.strategy import StrategyBase 41from ansible.template import Templar 42from ansible.module_utils._text import to_text 43from ansible.utils.display import Display 44 45display = Display() 46 47 48class StrategyModule(StrategyBase): 49 50 # This strategy manages throttling on its own, so we don't want it done in queue_task 51 ALLOW_BASE_THROTTLING = False 52 53 def _filter_notified_failed_hosts(self, iterator, notified_hosts): 54 55 # If --force-handlers is used we may act on hosts that have failed 56 return [host for host in notified_hosts if iterator.is_failed(host)] 57 58 def _filter_notified_hosts(self, notified_hosts): 59 ''' 60 Filter notified hosts accordingly to strategy 61 ''' 62 63 # We act only on hosts that are ready to flush handlers 64 return [host for host in notified_hosts 65 if host in self._flushed_hosts and self._flushed_hosts[host]] 66 67 def __init__(self, tqm): 68 super(StrategyModule, self).__init__(tqm) 69 self._host_pinned = False 70 71 def run(self, iterator, play_context): 72 ''' 73 The "free" strategy is a bit more complex, in that it allows tasks to 74 be sent to hosts as quickly as they can be processed. This means that 75 some hosts may finish very quickly if run tasks result in little or no 76 work being done versus other systems. 77 78 The algorithm used here also tries to be more "fair" when iterating 79 through hosts by remembering the last host in the list to be given a task 80 and starting the search from there as opposed to the top of the hosts 81 list again, which would end up favoring hosts near the beginning of the 82 list. 83 ''' 84 85 # the last host to be given a task 86 last_host = 0 87 88 result = self._tqm.RUN_OK 89 90 # start with all workers being counted as being free 91 workers_free = len(self._workers) 92 93 self._set_hosts_cache(iterator._play) 94 95 if iterator._play.max_fail_percentage is not None: 96 display.warning("Using max_fail_percentage with the free strategy is not supported, as tasks are executed independently on each host") 97 98 work_to_do = True 99 while work_to_do and not self._tqm._terminated: 100 101 hosts_left = self.get_hosts_left(iterator) 102 103 if len(hosts_left) == 0: 104 self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') 105 result = False 106 break 107 108 work_to_do = False # assume we have no more work to do 109 starting_host = last_host # save current position so we know when we've looped back around and need to break 110 111 # try and find an unblocked host with a task to run 112 host_results = [] 113 while True: 114 host = hosts_left[last_host] 115 display.debug("next free host: %s" % host) 116 host_name = host.get_name() 117 118 # peek at the next task for the host, to see if there's 119 # anything to do do for this host 120 (state, task) = iterator.get_next_task_for_host(host, peek=True) 121 display.debug("free host state: %s" % state, host=host_name) 122 display.debug("free host task: %s" % task, host=host_name) 123 if host_name not in self._tqm._unreachable_hosts and task: 124 125 # set the flag so the outer loop knows we've still found 126 # some work which needs to be done 127 work_to_do = True 128 129 display.debug("this host has work to do", host=host_name) 130 131 # check to see if this host is blocked (still executing a previous task) 132 if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]): 133 134 display.debug("getting variables", host=host_name) 135 task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, 136 _hosts=self._hosts_cache, 137 _hosts_all=self._hosts_cache_all) 138 self.add_tqm_variables(task_vars, play=iterator._play) 139 templar = Templar(loader=self._loader, variables=task_vars) 140 display.debug("done getting variables", host=host_name) 141 142 try: 143 throttle = int(templar.template(task.throttle)) 144 except Exception as e: 145 raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) 146 147 if throttle > 0: 148 same_tasks = 0 149 for worker in self._workers: 150 if worker and worker.is_alive() and worker._task._uuid == task._uuid: 151 same_tasks += 1 152 153 display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks)) 154 if same_tasks >= throttle: 155 break 156 157 # pop the task, mark the host blocked, and queue it 158 self._blocked_hosts[host_name] = True 159 (state, task) = iterator.get_next_task_for_host(host) 160 161 try: 162 action = action_loader.get(task.action, class_only=True, collection_list=task.collections) 163 except KeyError: 164 # we don't care here, because the action may simply not have a 165 # corresponding action plugin 166 action = None 167 168 try: 169 task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') 170 display.debug("done templating", host=host_name) 171 except Exception: 172 # just ignore any errors during task name templating, 173 # we don't care if it just shows the raw name 174 display.debug("templating failed for some reason", host=host_name) 175 176 run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False) 177 if run_once: 178 if action and getattr(action, 'BYPASS_HOST_LOOP', False): 179 raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy " 180 "and would instead execute for every host in the inventory list." % task.action, obj=task._ds) 181 else: 182 display.warning("Using run_once with the free strategy is not currently supported. This task will still be " 183 "executed for every host in the inventory list.") 184 185 # check to see if this task should be skipped, due to it being a member of a 186 # role which has already run (and whether that role allows duplicate execution) 187 if task._role and task._role.has_run(host): 188 # If there is no metadata, the default behavior is to not allow duplicates, 189 # if there is metadata, check to see if the allow_duplicates flag was set to true 190 if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: 191 display.debug("'%s' skipped because role has already run" % task, host=host_name) 192 del self._blocked_hosts[host_name] 193 continue 194 195 if task.action in C._ACTION_META: 196 self._execute_meta(task, play_context, iterator, target_host=host) 197 self._blocked_hosts[host_name] = False 198 else: 199 # handle step if needed, skip meta actions as they are used internally 200 if not self._step or self._take_step(task, host_name): 201 if task.any_errors_fatal: 202 display.warning("Using any_errors_fatal with the free strategy is not supported, " 203 "as tasks are executed independently on each host") 204 self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) 205 self._queue_task(host, task, task_vars, play_context) 206 # each task is counted as a worker being busy 207 workers_free -= 1 208 del task_vars 209 else: 210 display.debug("%s is blocked, skipping for now" % host_name) 211 212 # all workers have tasks to do (and the current host isn't done with the play). 213 # loop back to starting host and break out 214 if self._host_pinned and workers_free == 0 and work_to_do: 215 last_host = starting_host 216 break 217 218 # move on to the next host and make sure we 219 # haven't gone past the end of our hosts list 220 last_host += 1 221 if last_host > len(hosts_left) - 1: 222 last_host = 0 223 224 # if we've looped around back to the start, break out 225 if last_host == starting_host: 226 break 227 228 results = self._process_pending_results(iterator) 229 host_results.extend(results) 230 231 # each result is counted as a worker being free again 232 workers_free += len(results) 233 234 self.update_active_connections(results) 235 236 included_files = IncludedFile.process_include_results( 237 host_results, 238 iterator=iterator, 239 loader=self._loader, 240 variable_manager=self._variable_manager 241 ) 242 243 if len(included_files) > 0: 244 all_blocks = dict((host, []) for host in hosts_left) 245 for included_file in included_files: 246 display.debug("collecting new blocks for %s" % included_file) 247 try: 248 if included_file._is_role: 249 new_ir = self._copy_included_file(included_file) 250 251 new_blocks, handler_blocks = new_ir.get_block_list( 252 play=iterator._play, 253 variable_manager=self._variable_manager, 254 loader=self._loader, 255 ) 256 else: 257 new_blocks = self._load_included_file(included_file, iterator=iterator) 258 except AnsibleError as e: 259 for host in included_file._hosts: 260 iterator.mark_host_failed(host) 261 display.warning(to_text(e)) 262 continue 263 264 for new_block in new_blocks: 265 task_vars = self._variable_manager.get_vars(play=iterator._play, task=new_block.get_first_parent_include(), 266 _hosts=self._hosts_cache, 267 _hosts_all=self._hosts_cache_all) 268 final_block = new_block.filter_tagged_tasks(task_vars) 269 for host in hosts_left: 270 if host in included_file._hosts: 271 all_blocks[host].append(final_block) 272 display.debug("done collecting new blocks for %s" % included_file) 273 274 display.debug("adding all collected blocks from %d included file(s) to iterator" % len(included_files)) 275 for host in hosts_left: 276 iterator.add_tasks(host, all_blocks[host]) 277 display.debug("done adding collected blocks to iterator") 278 279 # pause briefly so we don't spin lock 280 time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) 281 282 # collect all the final results 283 results = self._wait_on_pending_results(iterator) 284 285 # run the base class run() method, which executes the cleanup function 286 # and runs any outstanding handlers which have been triggered 287 return super(StrategyModule, self).run(iterator, play_context, result) 288