1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
2#
3# This file is part of Ansible
4#
5# Ansible is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Ansible is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
17# Make coding more python3-ish
18from __future__ import (absolute_import, division, print_function)
19__metaclass__ = type
20
21DOCUMENTATION = '''
22    name: linear
23    short_description: Executes tasks in a linear fashion
24    description:
25        - Task execution is in lockstep per host batch as defined by C(serial) (default all).
26          Up to the fork limit of hosts will execute each task at the same time and then
27          the next series of hosts until the batch is done, before going on to the next task.
28    version_added: "2.0"
29    notes:
30     - This was the default Ansible behaviour before 'strategy plugins' were introduced in 2.0.
31    author: Ansible Core Team
32'''
33
34from ansible import constants as C
35from ansible.errors import AnsibleError, AnsibleAssertionError
36from ansible.executor.play_iterator import PlayIterator
37from ansible.module_utils.six import iteritems
38from ansible.module_utils._text import to_text
39from ansible.playbook.block import Block
40from ansible.playbook.included_file import IncludedFile
41from ansible.playbook.task import Task
42from ansible.plugins.loader import action_loader
43from ansible.plugins.strategy import StrategyBase
44from ansible.template import Templar
45from ansible.utils.display import Display
46
47display = Display()
48
49
50class StrategyModule(StrategyBase):
51
52    noop_task = None
53
54    def _replace_with_noop(self, target):
55        if self.noop_task is None:
56            raise AnsibleAssertionError('strategy.linear.StrategyModule.noop_task is None, need Task()')
57
58        result = []
59        for el in target:
60            if isinstance(el, Task):
61                result.append(self.noop_task)
62            elif isinstance(el, Block):
63                result.append(self._create_noop_block_from(el, el._parent))
64        return result
65
66    def _create_noop_block_from(self, original_block, parent):
67        noop_block = Block(parent_block=parent)
68        noop_block.block = self._replace_with_noop(original_block.block)
69        noop_block.always = self._replace_with_noop(original_block.always)
70        noop_block.rescue = self._replace_with_noop(original_block.rescue)
71
72        return noop_block
73
74    def _prepare_and_create_noop_block_from(self, original_block, parent, iterator):
75        self.noop_task = Task()
76        self.noop_task.action = 'meta'
77        self.noop_task.args['_raw_params'] = 'noop'
78        self.noop_task.implicit = True
79        self.noop_task.set_loader(iterator._play._loader)
80
81        return self._create_noop_block_from(original_block, parent)
82
83    def _get_next_task_lockstep(self, hosts, iterator):
84        '''
85        Returns a list of (host, task) tuples, where the task may
86        be a noop task to keep the iterator in lock step across
87        all hosts.
88        '''
89
90        noop_task = Task()
91        noop_task.action = 'meta'
92        noop_task.args['_raw_params'] = 'noop'
93        noop_task.implicit = True
94        noop_task.set_loader(iterator._play._loader)
95
96        host_tasks = {}
97        display.debug("building list of next tasks for hosts")
98        for host in hosts:
99            host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True)
100        display.debug("done building task lists")
101
102        num_setups = 0
103        num_tasks = 0
104        num_rescue = 0
105        num_always = 0
106
107        display.debug("counting tasks in each state of execution")
108        host_tasks_to_run = [(host, state_task)
109                             for host, state_task in iteritems(host_tasks)
110                             if state_task and state_task[1]]
111
112        if host_tasks_to_run:
113            try:
114                lowest_cur_block = min(
115                    (iterator.get_active_state(s).cur_block for h, (s, t) in host_tasks_to_run
116                     if s.run_state != PlayIterator.ITERATING_COMPLETE))
117            except ValueError:
118                lowest_cur_block = None
119        else:
120            # empty host_tasks_to_run will just run till the end of the function
121            # without ever touching lowest_cur_block
122            lowest_cur_block = None
123
124        for (k, v) in host_tasks_to_run:
125            (s, t) = v
126
127            s = iterator.get_active_state(s)
128            if s.cur_block > lowest_cur_block:
129                # Not the current block, ignore it
130                continue
131
132            if s.run_state == PlayIterator.ITERATING_SETUP:
133                num_setups += 1
134            elif s.run_state == PlayIterator.ITERATING_TASKS:
135                num_tasks += 1
136            elif s.run_state == PlayIterator.ITERATING_RESCUE:
137                num_rescue += 1
138            elif s.run_state == PlayIterator.ITERATING_ALWAYS:
139                num_always += 1
140        display.debug("done counting tasks in each state of execution:\n\tnum_setups: %s\n\tnum_tasks: %s\n\tnum_rescue: %s\n\tnum_always: %s" % (num_setups,
141                                                                                                                                                  num_tasks,
142                                                                                                                                                  num_rescue,
143                                                                                                                                                  num_always))
144
145        def _advance_selected_hosts(hosts, cur_block, cur_state):
146            '''
147            This helper returns the task for all hosts in the requested
148            state, otherwise they get a noop dummy task. This also advances
149            the state of the host, since the given states are determined
150            while using peek=True.
151            '''
152            # we return the values in the order they were originally
153            # specified in the given hosts array
154            rvals = []
155            display.debug("starting to advance hosts")
156            for host in hosts:
157                host_state_task = host_tasks.get(host.name)
158                if host_state_task is None:
159                    continue
160                (s, t) = host_state_task
161                s = iterator.get_active_state(s)
162                if t is None:
163                    continue
164                if s.run_state == cur_state and s.cur_block == cur_block:
165                    new_t = iterator.get_next_task_for_host(host)
166                    rvals.append((host, t))
167                else:
168                    rvals.append((host, noop_task))
169            display.debug("done advancing hosts to next task")
170            return rvals
171
172        # if any hosts are in ITERATING_SETUP, return the setup task
173        # while all other hosts get a noop
174        if num_setups:
175            display.debug("advancing hosts in ITERATING_SETUP")
176            return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_SETUP)
177
178        # if any hosts are in ITERATING_TASKS, return the next normal
179        # task for these hosts, while all other hosts get a noop
180        if num_tasks:
181            display.debug("advancing hosts in ITERATING_TASKS")
182            return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_TASKS)
183
184        # if any hosts are in ITERATING_RESCUE, return the next rescue
185        # task for these hosts, while all other hosts get a noop
186        if num_rescue:
187            display.debug("advancing hosts in ITERATING_RESCUE")
188            return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_RESCUE)
189
190        # if any hosts are in ITERATING_ALWAYS, return the next always
191        # task for these hosts, while all other hosts get a noop
192        if num_always:
193            display.debug("advancing hosts in ITERATING_ALWAYS")
194            return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_ALWAYS)
195
196        # at this point, everything must be ITERATING_COMPLETE, so we
197        # return None for all hosts in the list
198        display.debug("all hosts are done, so returning None's for all hosts")
199        return [(host, None) for host in hosts]
200
201    def run(self, iterator, play_context):
202        '''
203        The linear strategy is simple - get the next task and queue
204        it for all hosts, then wait for the queue to drain before
205        moving on to the next task
206        '''
207
208        # iterate over each task, while there is one left to run
209        result = self._tqm.RUN_OK
210        work_to_do = True
211
212        self._set_hosts_cache(iterator._play)
213
214        while work_to_do and not self._tqm._terminated:
215
216            try:
217                display.debug("getting the remaining hosts for this loop")
218                hosts_left = self.get_hosts_left(iterator)
219                display.debug("done getting the remaining hosts for this loop")
220
221                # queue up this task for each host in the inventory
222                callback_sent = False
223                work_to_do = False
224
225                host_results = []
226                host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
227
228                # skip control
229                skip_rest = False
230                choose_step = True
231
232                # flag set if task is set to any_errors_fatal
233                any_errors_fatal = False
234
235                results = []
236                for (host, task) in host_tasks:
237                    if not task:
238                        continue
239
240                    if self._tqm._terminated:
241                        break
242
243                    run_once = False
244                    work_to_do = True
245
246                    # check to see if this task should be skipped, due to it being a member of a
247                    # role which has already run (and whether that role allows duplicate execution)
248                    if task._role and task._role.has_run(host):
249                        # If there is no metadata, the default behavior is to not allow duplicates,
250                        # if there is metadata, check to see if the allow_duplicates flag was set to true
251                        if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
252                            display.debug("'%s' skipped because role has already run" % task)
253                            continue
254
255                    display.debug("getting variables")
256                    task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
257                                                                _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all)
258                    self.add_tqm_variables(task_vars, play=iterator._play)
259                    templar = Templar(loader=self._loader, variables=task_vars)
260                    display.debug("done getting variables")
261
262                    # test to see if the task across all hosts points to an action plugin which
263                    # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
264                    # will only send this task to the first host in the list.
265
266                    task_action = templar.template(task.action)
267
268                    try:
269                        action = action_loader.get(task_action, class_only=True, collection_list=task.collections)
270                    except KeyError:
271                        # we don't care here, because the action may simply not have a
272                        # corresponding action plugin
273                        action = None
274
275                    if task_action in C._ACTION_META:
276                        # for the linear strategy, we run meta tasks just once and for
277                        # all hosts currently being iterated over rather than one host
278                        results.extend(self._execute_meta(task, play_context, iterator, host))
279                        if task.args.get('_raw_params', None) not in ('noop', 'reset_connection', 'end_host', 'role_complete'):
280                            run_once = True
281                        if (task.any_errors_fatal or run_once) and not task.ignore_errors:
282                            any_errors_fatal = True
283                    else:
284                        # handle step if needed, skip meta actions as they are used internally
285                        if self._step and choose_step:
286                            if self._take_step(task):
287                                choose_step = False
288                            else:
289                                skip_rest = True
290                                break
291
292                        run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
293
294                        if (task.any_errors_fatal or run_once) and not task.ignore_errors:
295                            any_errors_fatal = True
296
297                        if not callback_sent:
298                            display.debug("sending task start callback, copying the task so we can template it temporarily")
299                            saved_name = task.name
300                            display.debug("done copying, going to template now")
301                            try:
302                                task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
303                                display.debug("done templating")
304                            except Exception:
305                                # just ignore any errors during task name templating,
306                                # we don't care if it just shows the raw name
307                                display.debug("templating failed for some reason")
308                            display.debug("here goes the callback...")
309                            self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
310                            task.name = saved_name
311                            callback_sent = True
312                            display.debug("sending task start callback")
313
314                        self._blocked_hosts[host.get_name()] = True
315                        self._queue_task(host, task, task_vars, play_context)
316                        del task_vars
317
318                    # if we're bypassing the host loop, break out now
319                    if run_once:
320                        break
321
322                    results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))
323
324                # go to next host/task group
325                if skip_rest:
326                    continue
327
328                display.debug("done queuing things up, now waiting for results queue to drain")
329                if self._pending_results > 0:
330                    results += self._wait_on_pending_results(iterator)
331
332                host_results.extend(results)
333
334                self.update_active_connections(results)
335
336                included_files = IncludedFile.process_include_results(
337                    host_results,
338                    iterator=iterator,
339                    loader=self._loader,
340                    variable_manager=self._variable_manager
341                )
342
343                include_failure = False
344                if len(included_files) > 0:
345                    display.debug("we have included files to process")
346
347                    display.debug("generating all_blocks data")
348                    all_blocks = dict((host, []) for host in hosts_left)
349                    display.debug("done generating all_blocks data")
350                    for included_file in included_files:
351                        display.debug("processing included file: %s" % included_file._filename)
352                        # included hosts get the task list while those excluded get an equal-length
353                        # list of noop tasks, to make sure that they continue running in lock-step
354                        try:
355                            if included_file._is_role:
356                                new_ir = self._copy_included_file(included_file)
357
358                                new_blocks, handler_blocks = new_ir.get_block_list(
359                                    play=iterator._play,
360                                    variable_manager=self._variable_manager,
361                                    loader=self._loader,
362                                )
363                            else:
364                                new_blocks = self._load_included_file(included_file, iterator=iterator)
365
366                            display.debug("iterating over new_blocks loaded from include file")
367                            for new_block in new_blocks:
368                                task_vars = self._variable_manager.get_vars(
369                                    play=iterator._play,
370                                    task=new_block.get_first_parent_include(),
371                                    _hosts=self._hosts_cache,
372                                    _hosts_all=self._hosts_cache_all,
373                                )
374                                display.debug("filtering new block on tags")
375                                final_block = new_block.filter_tagged_tasks(task_vars)
376                                display.debug("done filtering new block on tags")
377
378                                noop_block = self._prepare_and_create_noop_block_from(final_block, task._parent, iterator)
379
380                                for host in hosts_left:
381                                    if host in included_file._hosts:
382                                        all_blocks[host].append(final_block)
383                                    else:
384                                        all_blocks[host].append(noop_block)
385                            display.debug("done iterating over new_blocks loaded from include file")
386
387                        except AnsibleError as e:
388                            for host in included_file._hosts:
389                                self._tqm._failed_hosts[host.name] = True
390                                iterator.mark_host_failed(host)
391                            display.error(to_text(e), wrap_text=False)
392                            include_failure = True
393                            continue
394
395                    # finally go through all of the hosts and append the
396                    # accumulated blocks to their list of tasks
397                    display.debug("extending task lists for all hosts with included blocks")
398
399                    for host in hosts_left:
400                        iterator.add_tasks(host, all_blocks[host])
401
402                    display.debug("done extending task lists")
403                    display.debug("done processing included files")
404
405                display.debug("results queue empty")
406
407                display.debug("checking for any_errors_fatal")
408                failed_hosts = []
409                unreachable_hosts = []
410                for res in results:
411                    # execute_meta() does not set 'failed' in the TaskResult
412                    # so we skip checking it with the meta tasks and look just at the iterator
413                    if (res.is_failed() or res._task.action in C._ACTION_META) and iterator.is_failed(res._host):
414                        failed_hosts.append(res._host.name)
415                    elif res.is_unreachable():
416                        unreachable_hosts.append(res._host.name)
417
418                # if any_errors_fatal and we had an error, mark all hosts as failed
419                if any_errors_fatal and (len(failed_hosts) > 0 or len(unreachable_hosts) > 0):
420                    dont_fail_states = frozenset([iterator.ITERATING_RESCUE, iterator.ITERATING_ALWAYS])
421                    for host in hosts_left:
422                        (s, _) = iterator.get_next_task_for_host(host, peek=True)
423                        # the state may actually be in a child state, use the get_active_state()
424                        # method in the iterator to figure out the true active state
425                        s = iterator.get_active_state(s)
426                        if s.run_state not in dont_fail_states or \
427                           s.run_state == iterator.ITERATING_RESCUE and s.fail_state & iterator.FAILED_RESCUE != 0:
428                            self._tqm._failed_hosts[host.name] = True
429                            result |= self._tqm.RUN_FAILED_BREAK_PLAY
430                display.debug("done checking for any_errors_fatal")
431
432                display.debug("checking for max_fail_percentage")
433                if iterator._play.max_fail_percentage is not None and len(results) > 0:
434                    percentage = iterator._play.max_fail_percentage / 100.0
435
436                    if (len(self._tqm._failed_hosts) / iterator.batch_size) > percentage:
437                        for host in hosts_left:
438                            # don't double-mark hosts, or the iterator will potentially
439                            # fail them out of the rescue/always states
440                            if host.name not in failed_hosts:
441                                self._tqm._failed_hosts[host.name] = True
442                                iterator.mark_host_failed(host)
443                        self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
444                        result |= self._tqm.RUN_FAILED_BREAK_PLAY
445                    display.debug('(%s failed / %s total )> %s max fail' % (len(self._tqm._failed_hosts), iterator.batch_size, percentage))
446                display.debug("done checking for max_fail_percentage")
447
448                display.debug("checking to see if all hosts have failed and the running result is not ok")
449                if result != self._tqm.RUN_OK and len(self._tqm._failed_hosts) >= len(hosts_left):
450                    display.debug("^ not ok, so returning result now")
451                    self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
452                    return result
453                display.debug("done checking to see if all hosts have failed")
454
455            except (IOError, EOFError) as e:
456                display.debug("got IOError/EOFError in task loop: %s" % e)
457                # most likely an abort, return failed
458                return self._tqm.RUN_UNKNOWN_ERROR
459
460        # run the base class run() method, which executes the cleanup function
461        # and runs any outstanding handlers which have been triggered
462
463        return super(StrategyModule, self).run(iterator, play_context, result)
464