1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17from concurrent import futures 18import weakref 19 20from automaton import machines 21from oslo_utils import timeutils 22 23from taskflow import logging 24from taskflow import states as st 25from taskflow.types import failure 26from taskflow.utils import iter_utils 27 28# Default waiting state timeout (in seconds). 29WAITING_TIMEOUT = 60 30 31# Meta states the state machine uses. 32UNDEFINED = 'UNDEFINED' 33GAME_OVER = 'GAME_OVER' 34META_STATES = (GAME_OVER, UNDEFINED) 35 36# Event name constants the state machine uses. 37SCHEDULE = 'schedule_next' 38WAIT = 'wait_finished' 39ANALYZE = 'examine_finished' 40FINISH = 'completed' 41FAILED = 'failed' 42SUSPENDED = 'suspended' 43SUCCESS = 'success' 44REVERTED = 'reverted' 45START = 'start' 46 47# Internal enums used to denote how/if a atom was completed.""" 48FAILED_COMPLETING = 'failed_completing' 49WAS_CANCELLED = 'was_cancelled' 50SUCCESSFULLY_COMPLETED = 'successfully_completed' 51 52 53# For these states we will gather how long (in seconds) the 54# state was in-progress (cumulatively if the state is entered multiple 55# times) 56TIMED_STATES = (st.ANALYZING, st.RESUMING, st.SCHEDULING, st.WAITING) 57 58LOG = logging.getLogger(__name__) 59 60 61class MachineMemory(object): 62 """State machine memory.""" 63 64 def __init__(self): 65 self.next_up = set() 66 self.not_done = set() 67 self.failures = [] 68 self.done = set() 69 70 def cancel_futures(self): 71 """Attempts to cancel any not done futures.""" 72 for fut in self.not_done: 73 fut.cancel() 74 75 76class MachineBuilder(object): 77 """State machine *builder* that powers the engine components. 78 79 NOTE(harlowja): the machine (states and events that will trigger 80 transitions) that this builds is represented by the following 81 table:: 82 83 +--------------+------------------+------------+----------+---------+ 84 | Start | Event | End | On Enter | On Exit | 85 +--------------+------------------+------------+----------+---------+ 86 | ANALYZING | completed | GAME_OVER | . | . | 87 | ANALYZING | schedule_next | SCHEDULING | . | . | 88 | ANALYZING | wait_finished | WAITING | . | . | 89 | FAILURE[$] | . | . | . | . | 90 | GAME_OVER | failed | FAILURE | . | . | 91 | GAME_OVER | reverted | REVERTED | . | . | 92 | GAME_OVER | success | SUCCESS | . | . | 93 | GAME_OVER | suspended | SUSPENDED | . | . | 94 | RESUMING | schedule_next | SCHEDULING | . | . | 95 | REVERTED[$] | . | . | . | . | 96 | SCHEDULING | wait_finished | WAITING | . | . | 97 | SUCCESS[$] | . | . | . | . | 98 | SUSPENDED[$] | . | . | . | . | 99 | UNDEFINED[^] | start | RESUMING | . | . | 100 | WAITING | examine_finished | ANALYZING | . | . | 101 +--------------+------------------+------------+----------+---------+ 102 103 Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``) 104 if the engine has been suspended or the engine has failed (due to a 105 non-resolveable task failure or scheduling failure) the machine will stop 106 executing new tasks (currently running tasks will be allowed to complete) 107 and this machines run loop will be broken. 108 109 NOTE(harlowja): If the runtimes scheduler component is able to schedule 110 tasks in parallel, this enables parallel running and/or reversion. 111 """ 112 113 def __init__(self, runtime, waiter): 114 self._runtime = weakref.proxy(runtime) 115 self._selector = runtime.selector 116 self._completer = runtime.completer 117 self._scheduler = runtime.scheduler 118 self._storage = runtime.storage 119 self._waiter = waiter 120 121 def build(self, statistics, timeout=None, gather_statistics=True): 122 """Builds a state-machine (that is used during running).""" 123 if gather_statistics: 124 watches = {} 125 state_statistics = {} 126 statistics['seconds_per_state'] = state_statistics 127 watches = {} 128 for timed_state in TIMED_STATES: 129 state_statistics[timed_state.lower()] = 0.0 130 watches[timed_state] = timeutils.StopWatch() 131 statistics['discarded_failures'] = 0 132 statistics['awaiting'] = 0 133 statistics['completed'] = 0 134 statistics['incomplete'] = 0 135 136 memory = MachineMemory() 137 if timeout is None: 138 timeout = WAITING_TIMEOUT 139 140 # Cache some local functions/methods... 141 do_complete = self._completer.complete 142 do_complete_failure = self._completer.complete_failure 143 get_atom_intention = self._storage.get_atom_intention 144 145 def do_schedule(next_nodes): 146 with self._storage.lock.write_lock(): 147 return self._scheduler.schedule( 148 sorted(next_nodes, 149 key=lambda node: getattr(node, 'priority', 0), 150 reverse=True)) 151 152 def iter_next_atoms(atom=None, apply_deciders=True): 153 # Yields and filters and tweaks the next atoms to run... 154 maybe_atoms_it = self._selector.iter_next_atoms(atom=atom) 155 for atom, late_decider in maybe_atoms_it: 156 if apply_deciders: 157 proceed = late_decider.check_and_affect(self._runtime) 158 if proceed: 159 yield atom 160 else: 161 yield atom 162 163 def resume(old_state, new_state, event): 164 # This reaction function just updates the state machines memory 165 # to include any nodes that need to be executed (from a previous 166 # attempt, which may be empty if never ran before) and any nodes 167 # that are now ready to be ran. 168 with self._storage.lock.write_lock(): 169 memory.next_up.update( 170 iter_utils.unique_seen((self._completer.resume(), 171 iter_next_atoms()))) 172 return SCHEDULE 173 174 def game_over(old_state, new_state, event): 175 # This reaction function is mainly a intermediary delegation 176 # function that analyzes the current memory and transitions to 177 # the appropriate handler that will deal with the memory values, 178 # it is *always* called before the final state is entered. 179 if memory.failures: 180 return FAILED 181 with self._storage.lock.read_lock(): 182 leftover_atoms = iter_utils.count( 183 # Avoid activating the deciders, since at this point 184 # the engine is finishing and there will be no more further 185 # work done anyway... 186 iter_next_atoms(apply_deciders=False)) 187 if leftover_atoms: 188 # Ok we didn't finish (either reverting or executing...) so 189 # that means we must of been stopped at some point... 190 LOG.trace("Suspension determined to have been reacted to" 191 " since (at least) %s atoms have been left in an" 192 " unfinished state", leftover_atoms) 193 return SUSPENDED 194 elif self._runtime.is_success(): 195 return SUCCESS 196 else: 197 return REVERTED 198 199 def schedule(old_state, new_state, event): 200 # This reaction function starts to schedule the memory's next 201 # nodes (iff the engine is still runnable, which it may not be 202 # if the user of this engine has requested the engine/storage 203 # that holds this information to stop or suspend); handles failures 204 # that occur during this process safely... 205 with self._storage.lock.write_lock(): 206 current_flow_state = self._storage.get_flow_state() 207 if current_flow_state == st.RUNNING and memory.next_up: 208 not_done, failures = do_schedule(memory.next_up) 209 if not_done: 210 memory.not_done.update(not_done) 211 if failures: 212 memory.failures.extend(failures) 213 memory.next_up.intersection_update(not_done) 214 elif current_flow_state == st.SUSPENDING and memory.not_done: 215 # Try to force anything not cancelled to now be cancelled 216 # so that the executor that gets it does not continue to 217 # try to work on it (if the future execution is still in 218 # its backlog, if it's already being executed, this will 219 # do nothing). 220 memory.cancel_futures() 221 return WAIT 222 223 def complete_an_atom(fut): 224 # This completes a single atom saving its result in 225 # storage and preparing whatever predecessors or successors will 226 # now be ready to execute (or revert or retry...); it also 227 # handles failures that occur during this process safely... 228 atom = fut.atom 229 try: 230 outcome, result = fut.result() 231 do_complete(atom, outcome, result) 232 if isinstance(result, failure.Failure): 233 retain = do_complete_failure(atom, outcome, result) 234 if retain: 235 memory.failures.append(result) 236 else: 237 # NOTE(harlowja): avoid making any intention request 238 # to storage unless we are sure we are in DEBUG 239 # enabled logging (otherwise we will call this all 240 # the time even when DEBUG is not enabled, which 241 # would suck...) 242 if LOG.isEnabledFor(logging.DEBUG): 243 intention = get_atom_intention(atom.name) 244 LOG.debug("Discarding failure '%s' (in response" 245 " to outcome '%s') under completion" 246 " units request during completion of" 247 " atom '%s' (intention is to %s)", 248 result, outcome, atom, intention) 249 if gather_statistics: 250 statistics['discarded_failures'] += 1 251 if gather_statistics: 252 statistics['completed'] += 1 253 except futures.CancelledError: 254 # Well it got cancelled, skip doing anything 255 # and move on; at a further time it will be resumed 256 # and something should be done with it to get it 257 # going again. 258 return WAS_CANCELLED 259 except Exception: 260 memory.failures.append(failure.Failure()) 261 LOG.exception("Engine '%s' atom post-completion" 262 " failed", atom) 263 return FAILED_COMPLETING 264 else: 265 return SUCCESSFULLY_COMPLETED 266 267 def wait(old_state, new_state, event): 268 # TODO(harlowja): maybe we should start doing 'yield from' this 269 # call sometime in the future, or equivalent that will work in 270 # py2 and py3. 271 if memory.not_done: 272 done, not_done = self._waiter(memory.not_done, timeout=timeout) 273 memory.done.update(done) 274 memory.not_done = not_done 275 return ANALYZE 276 277 def analyze(old_state, new_state, event): 278 # This reaction function is responsible for analyzing all nodes 279 # that have finished executing/reverting and figuring 280 # out what nodes are now ready to be ran (and then triggering those 281 # nodes to be scheduled in the future); handles failures that 282 # occur during this process safely... 283 next_up = set() 284 with self._storage.lock.write_lock(): 285 while memory.done: 286 fut = memory.done.pop() 287 # Force it to be completed so that we can ensure that 288 # before we iterate over any successors or predecessors 289 # that we know it has been completed and saved and so on... 290 completion_status = complete_an_atom(fut) 291 if (not memory.failures 292 and completion_status != WAS_CANCELLED): 293 atom = fut.atom 294 try: 295 more_work = set(iter_next_atoms(atom=atom)) 296 except Exception: 297 memory.failures.append(failure.Failure()) 298 LOG.exception( 299 "Engine '%s' atom post-completion" 300 " next atom searching failed", atom) 301 else: 302 next_up.update(more_work) 303 current_flow_state = self._storage.get_flow_state() 304 if (current_flow_state == st.RUNNING 305 and next_up and not memory.failures): 306 memory.next_up.update(next_up) 307 return SCHEDULE 308 elif memory.not_done: 309 if current_flow_state == st.SUSPENDING: 310 memory.cancel_futures() 311 return WAIT 312 else: 313 return FINISH 314 315 def on_exit(old_state, event): 316 LOG.trace("Exiting old state '%s' in response to event '%s'", 317 old_state, event) 318 if gather_statistics: 319 if old_state in watches: 320 w = watches[old_state] 321 w.stop() 322 state_statistics[old_state.lower()] += w.elapsed() 323 if old_state in (st.SCHEDULING, st.WAITING): 324 statistics['incomplete'] = len(memory.not_done) 325 if old_state in (st.ANALYZING, st.SCHEDULING): 326 statistics['awaiting'] = len(memory.next_up) 327 328 def on_enter(new_state, event): 329 LOG.trace("Entering new state '%s' in response to event '%s'", 330 new_state, event) 331 if gather_statistics and new_state in watches: 332 watches[new_state].restart() 333 334 state_kwargs = { 335 'on_exit': on_exit, 336 'on_enter': on_enter, 337 } 338 m = machines.FiniteMachine() 339 m.add_state(GAME_OVER, **state_kwargs) 340 m.add_state(UNDEFINED, **state_kwargs) 341 m.add_state(st.ANALYZING, **state_kwargs) 342 m.add_state(st.RESUMING, **state_kwargs) 343 m.add_state(st.REVERTED, terminal=True, **state_kwargs) 344 m.add_state(st.SCHEDULING, **state_kwargs) 345 m.add_state(st.SUCCESS, terminal=True, **state_kwargs) 346 m.add_state(st.SUSPENDED, terminal=True, **state_kwargs) 347 m.add_state(st.WAITING, **state_kwargs) 348 m.add_state(st.FAILURE, terminal=True, **state_kwargs) 349 m.default_start_state = UNDEFINED 350 351 m.add_transition(GAME_OVER, st.REVERTED, REVERTED) 352 m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS) 353 m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED) 354 m.add_transition(GAME_OVER, st.FAILURE, FAILED) 355 m.add_transition(UNDEFINED, st.RESUMING, START) 356 m.add_transition(st.ANALYZING, GAME_OVER, FINISH) 357 m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE) 358 m.add_transition(st.ANALYZING, st.WAITING, WAIT) 359 m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE) 360 m.add_transition(st.SCHEDULING, st.WAITING, WAIT) 361 m.add_transition(st.WAITING, st.ANALYZING, ANALYZE) 362 363 m.add_reaction(GAME_OVER, FINISH, game_over) 364 m.add_reaction(st.ANALYZING, ANALYZE, analyze) 365 m.add_reaction(st.RESUMING, START, resume) 366 m.add_reaction(st.SCHEDULING, SCHEDULE, schedule) 367 m.add_reaction(st.WAITING, WAIT, wait) 368 369 m.freeze() 370 return (m, memory) 371