1# -*- coding: utf-8 -*-
2
3#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17from concurrent import futures
18import weakref
19
20from automaton import machines
21from oslo_utils import timeutils
22
23from taskflow import logging
24from taskflow import states as st
25from taskflow.types import failure
26from taskflow.utils import iter_utils
27
28# Default waiting state timeout (in seconds).
29WAITING_TIMEOUT = 60
30
31# Meta states the state machine uses.
32UNDEFINED = 'UNDEFINED'
33GAME_OVER = 'GAME_OVER'
34META_STATES = (GAME_OVER, UNDEFINED)
35
36# Event name constants the state machine uses.
37SCHEDULE = 'schedule_next'
38WAIT = 'wait_finished'
39ANALYZE = 'examine_finished'
40FINISH = 'completed'
41FAILED = 'failed'
42SUSPENDED = 'suspended'
43SUCCESS = 'success'
44REVERTED = 'reverted'
45START = 'start'
46
47# Internal enums used to denote how/if a atom was completed."""
48FAILED_COMPLETING = 'failed_completing'
49WAS_CANCELLED = 'was_cancelled'
50SUCCESSFULLY_COMPLETED = 'successfully_completed'
51
52
53# For these states we will gather how long (in seconds) the
54# state was in-progress (cumulatively if the state is entered multiple
55# times)
56TIMED_STATES = (st.ANALYZING, st.RESUMING, st.SCHEDULING, st.WAITING)
57
58LOG = logging.getLogger(__name__)
59
60
61class MachineMemory(object):
62    """State machine memory."""
63
64    def __init__(self):
65        self.next_up = set()
66        self.not_done = set()
67        self.failures = []
68        self.done = set()
69
70    def cancel_futures(self):
71        """Attempts to cancel any not done futures."""
72        for fut in self.not_done:
73            fut.cancel()
74
75
76class MachineBuilder(object):
77    """State machine *builder* that powers the engine components.
78
79    NOTE(harlowja): the machine (states and events that will trigger
80    transitions) that this builds is represented by the following
81    table::
82
83        +--------------+------------------+------------+----------+---------+
84        |    Start     |      Event       |    End     | On Enter | On Exit |
85        +--------------+------------------+------------+----------+---------+
86        |  ANALYZING   |    completed     | GAME_OVER  |    .     |    .    |
87        |  ANALYZING   |  schedule_next   | SCHEDULING |    .     |    .    |
88        |  ANALYZING   |  wait_finished   |  WAITING   |    .     |    .    |
89        |  FAILURE[$]  |        .         |     .      |    .     |    .    |
90        |  GAME_OVER   |      failed      |  FAILURE   |    .     |    .    |
91        |  GAME_OVER   |     reverted     |  REVERTED  |    .     |    .    |
92        |  GAME_OVER   |     success      |  SUCCESS   |    .     |    .    |
93        |  GAME_OVER   |    suspended     | SUSPENDED  |    .     |    .    |
94        |   RESUMING   |  schedule_next   | SCHEDULING |    .     |    .    |
95        | REVERTED[$]  |        .         |     .      |    .     |    .    |
96        |  SCHEDULING  |  wait_finished   |  WAITING   |    .     |    .    |
97        |  SUCCESS[$]  |        .         |     .      |    .     |    .    |
98        | SUSPENDED[$] |        .         |     .      |    .     |    .    |
99        | UNDEFINED[^] |      start       |  RESUMING  |    .     |    .    |
100        |   WAITING    | examine_finished | ANALYZING  |    .     |    .    |
101        +--------------+------------------+------------+----------+---------+
102
103    Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
104    if the engine has been suspended or the engine has failed (due to a
105    non-resolveable task failure or scheduling failure) the machine will stop
106    executing new tasks (currently running tasks will be allowed to complete)
107    and this machines run loop will be broken.
108
109    NOTE(harlowja): If the runtimes scheduler component is able to schedule
110    tasks in parallel, this enables parallel running and/or reversion.
111    """
112
113    def __init__(self, runtime, waiter):
114        self._runtime = weakref.proxy(runtime)
115        self._selector = runtime.selector
116        self._completer = runtime.completer
117        self._scheduler = runtime.scheduler
118        self._storage = runtime.storage
119        self._waiter = waiter
120
121    def build(self, statistics, timeout=None, gather_statistics=True):
122        """Builds a state-machine (that is used during running)."""
123        if gather_statistics:
124            watches = {}
125            state_statistics = {}
126            statistics['seconds_per_state'] = state_statistics
127            watches = {}
128            for timed_state in TIMED_STATES:
129                state_statistics[timed_state.lower()] = 0.0
130                watches[timed_state] = timeutils.StopWatch()
131            statistics['discarded_failures'] = 0
132            statistics['awaiting'] = 0
133            statistics['completed'] = 0
134            statistics['incomplete'] = 0
135
136        memory = MachineMemory()
137        if timeout is None:
138            timeout = WAITING_TIMEOUT
139
140        # Cache some local functions/methods...
141        do_complete = self._completer.complete
142        do_complete_failure = self._completer.complete_failure
143        get_atom_intention = self._storage.get_atom_intention
144
145        def do_schedule(next_nodes):
146            with self._storage.lock.write_lock():
147                return self._scheduler.schedule(
148                    sorted(next_nodes,
149                           key=lambda node: getattr(node, 'priority', 0),
150                           reverse=True))
151
152        def iter_next_atoms(atom=None, apply_deciders=True):
153            # Yields and filters and tweaks the next atoms to run...
154            maybe_atoms_it = self._selector.iter_next_atoms(atom=atom)
155            for atom, late_decider in maybe_atoms_it:
156                if apply_deciders:
157                    proceed = late_decider.check_and_affect(self._runtime)
158                    if proceed:
159                        yield atom
160                else:
161                    yield atom
162
163        def resume(old_state, new_state, event):
164            # This reaction function just updates the state machines memory
165            # to include any nodes that need to be executed (from a previous
166            # attempt, which may be empty if never ran before) and any nodes
167            # that are now ready to be ran.
168            with self._storage.lock.write_lock():
169                memory.next_up.update(
170                    iter_utils.unique_seen((self._completer.resume(),
171                                            iter_next_atoms())))
172            return SCHEDULE
173
174        def game_over(old_state, new_state, event):
175            # This reaction function is mainly a intermediary delegation
176            # function that analyzes the current memory and transitions to
177            # the appropriate handler that will deal with the memory values,
178            # it is *always* called before the final state is entered.
179            if memory.failures:
180                return FAILED
181            with self._storage.lock.read_lock():
182                leftover_atoms = iter_utils.count(
183                    # Avoid activating the deciders, since at this point
184                    # the engine is finishing and there will be no more further
185                    # work done anyway...
186                    iter_next_atoms(apply_deciders=False))
187            if leftover_atoms:
188                # Ok we didn't finish (either reverting or executing...) so
189                # that means we must of been stopped at some point...
190                LOG.trace("Suspension determined to have been reacted to"
191                          " since (at least) %s atoms have been left in an"
192                          " unfinished state", leftover_atoms)
193                return SUSPENDED
194            elif self._runtime.is_success():
195                return SUCCESS
196            else:
197                return REVERTED
198
199        def schedule(old_state, new_state, event):
200            # This reaction function starts to schedule the memory's next
201            # nodes (iff the engine is still runnable, which it may not be
202            # if the user of this engine has requested the engine/storage
203            # that holds this information to stop or suspend); handles failures
204            # that occur during this process safely...
205            with self._storage.lock.write_lock():
206                current_flow_state = self._storage.get_flow_state()
207                if current_flow_state == st.RUNNING and memory.next_up:
208                    not_done, failures = do_schedule(memory.next_up)
209                    if not_done:
210                        memory.not_done.update(not_done)
211                    if failures:
212                        memory.failures.extend(failures)
213                    memory.next_up.intersection_update(not_done)
214                elif current_flow_state == st.SUSPENDING and memory.not_done:
215                    # Try to force anything not cancelled to now be cancelled
216                    # so that the executor that gets it does not continue to
217                    # try to work on it (if the future execution is still in
218                    # its backlog, if it's already being executed, this will
219                    # do nothing).
220                    memory.cancel_futures()
221            return WAIT
222
223        def complete_an_atom(fut):
224            # This completes a single atom saving its result in
225            # storage and preparing whatever predecessors or successors will
226            # now be ready to execute (or revert or retry...); it also
227            # handles failures that occur during this process safely...
228            atom = fut.atom
229            try:
230                outcome, result = fut.result()
231                do_complete(atom, outcome, result)
232                if isinstance(result, failure.Failure):
233                    retain = do_complete_failure(atom, outcome, result)
234                    if retain:
235                        memory.failures.append(result)
236                    else:
237                        # NOTE(harlowja): avoid making any intention request
238                        # to storage unless we are sure we are in DEBUG
239                        # enabled logging (otherwise we will call this all
240                        # the time even when DEBUG is not enabled, which
241                        # would suck...)
242                        if LOG.isEnabledFor(logging.DEBUG):
243                            intention = get_atom_intention(atom.name)
244                            LOG.debug("Discarding failure '%s' (in response"
245                                      " to outcome '%s') under completion"
246                                      " units request during completion of"
247                                      " atom '%s' (intention is to %s)",
248                                      result, outcome, atom, intention)
249                        if gather_statistics:
250                            statistics['discarded_failures'] += 1
251                if gather_statistics:
252                    statistics['completed'] += 1
253            except futures.CancelledError:
254                # Well it got cancelled, skip doing anything
255                # and move on; at a further time it will be resumed
256                # and something should be done with it to get it
257                # going again.
258                return WAS_CANCELLED
259            except Exception:
260                memory.failures.append(failure.Failure())
261                LOG.exception("Engine '%s' atom post-completion"
262                              " failed", atom)
263                return FAILED_COMPLETING
264            else:
265                return SUCCESSFULLY_COMPLETED
266
267        def wait(old_state, new_state, event):
268            # TODO(harlowja): maybe we should start doing 'yield from' this
269            # call sometime in the future, or equivalent that will work in
270            # py2 and py3.
271            if memory.not_done:
272                done, not_done = self._waiter(memory.not_done, timeout=timeout)
273                memory.done.update(done)
274                memory.not_done = not_done
275            return ANALYZE
276
277        def analyze(old_state, new_state, event):
278            # This reaction function is responsible for analyzing all nodes
279            # that have finished executing/reverting and figuring
280            # out what nodes are now ready to be ran (and then triggering those
281            # nodes to be scheduled in the future); handles failures that
282            # occur during this process safely...
283            next_up = set()
284            with self._storage.lock.write_lock():
285                while memory.done:
286                    fut = memory.done.pop()
287                    # Force it to be completed so that we can ensure that
288                    # before we iterate over any successors or predecessors
289                    # that we know it has been completed and saved and so on...
290                    completion_status = complete_an_atom(fut)
291                    if (not memory.failures
292                            and completion_status != WAS_CANCELLED):
293                        atom = fut.atom
294                        try:
295                            more_work = set(iter_next_atoms(atom=atom))
296                        except Exception:
297                            memory.failures.append(failure.Failure())
298                            LOG.exception(
299                                "Engine '%s' atom post-completion"
300                                " next atom searching failed", atom)
301                        else:
302                            next_up.update(more_work)
303            current_flow_state = self._storage.get_flow_state()
304            if (current_flow_state == st.RUNNING
305                    and next_up and not memory.failures):
306                memory.next_up.update(next_up)
307                return SCHEDULE
308            elif memory.not_done:
309                if current_flow_state == st.SUSPENDING:
310                    memory.cancel_futures()
311                return WAIT
312            else:
313                return FINISH
314
315        def on_exit(old_state, event):
316            LOG.trace("Exiting old state '%s' in response to event '%s'",
317                      old_state, event)
318            if gather_statistics:
319                if old_state in watches:
320                    w = watches[old_state]
321                    w.stop()
322                    state_statistics[old_state.lower()] += w.elapsed()
323                if old_state in (st.SCHEDULING, st.WAITING):
324                    statistics['incomplete'] = len(memory.not_done)
325                if old_state in (st.ANALYZING, st.SCHEDULING):
326                    statistics['awaiting'] = len(memory.next_up)
327
328        def on_enter(new_state, event):
329            LOG.trace("Entering new state '%s' in response to event '%s'",
330                      new_state, event)
331            if gather_statistics and new_state in watches:
332                watches[new_state].restart()
333
334        state_kwargs = {
335            'on_exit': on_exit,
336            'on_enter': on_enter,
337        }
338        m = machines.FiniteMachine()
339        m.add_state(GAME_OVER, **state_kwargs)
340        m.add_state(UNDEFINED, **state_kwargs)
341        m.add_state(st.ANALYZING, **state_kwargs)
342        m.add_state(st.RESUMING, **state_kwargs)
343        m.add_state(st.REVERTED, terminal=True, **state_kwargs)
344        m.add_state(st.SCHEDULING, **state_kwargs)
345        m.add_state(st.SUCCESS, terminal=True, **state_kwargs)
346        m.add_state(st.SUSPENDED, terminal=True, **state_kwargs)
347        m.add_state(st.WAITING, **state_kwargs)
348        m.add_state(st.FAILURE, terminal=True, **state_kwargs)
349        m.default_start_state = UNDEFINED
350
351        m.add_transition(GAME_OVER, st.REVERTED, REVERTED)
352        m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS)
353        m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED)
354        m.add_transition(GAME_OVER, st.FAILURE, FAILED)
355        m.add_transition(UNDEFINED, st.RESUMING, START)
356        m.add_transition(st.ANALYZING, GAME_OVER, FINISH)
357        m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE)
358        m.add_transition(st.ANALYZING, st.WAITING, WAIT)
359        m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE)
360        m.add_transition(st.SCHEDULING, st.WAITING, WAIT)
361        m.add_transition(st.WAITING, st.ANALYZING, ANALYZE)
362
363        m.add_reaction(GAME_OVER, FINISH, game_over)
364        m.add_reaction(st.ANALYZING, ANALYZE, analyze)
365        m.add_reaction(st.RESUMING, START, resume)
366        m.add_reaction(st.SCHEDULING, SCHEDULE, schedule)
367        m.add_reaction(st.WAITING, WAIT, wait)
368
369        m.freeze()
370        return (m, memory)
371