1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17from taskflow.engines.action_engine.actions import base 18from taskflow import retry as retry_atom 19from taskflow import states 20from taskflow.types import failure 21 22 23class RetryAction(base.Action): 24 """An action that handles executing, state changes, ... of retry atoms.""" 25 26 def __init__(self, storage, notifier, retry_executor): 27 super(RetryAction, self).__init__(storage, notifier) 28 self._retry_executor = retry_executor 29 30 def _get_retry_args(self, retry, revert=False, addons=None): 31 if revert: 32 arguments = self._storage.fetch_mapped_args( 33 retry.revert_rebind, 34 atom_name=retry.name, 35 optional_args=retry.revert_optional 36 ) 37 else: 38 arguments = self._storage.fetch_mapped_args( 39 retry.rebind, 40 atom_name=retry.name, 41 optional_args=retry.optional 42 ) 43 history = self._storage.get_retry_history(retry.name) 44 arguments[retry_atom.EXECUTE_REVERT_HISTORY] = history 45 if addons: 46 arguments.update(addons) 47 return arguments 48 49 def change_state(self, retry, state, result=base.Action.NO_RESULT): 50 old_state = self._storage.get_atom_state(retry.name) 51 if state in self.SAVE_RESULT_STATES: 52 save_result = None 53 if result is not self.NO_RESULT: 54 save_result = result 55 self._storage.save(retry.name, save_result, state) 56 # TODO(harlowja): combine this with the save to avoid a call 57 # back into the persistence layer... 58 if state == states.REVERTED: 59 self._storage.cleanup_retry_history(retry.name, state) 60 else: 61 if state == old_state: 62 # NOTE(imelnikov): nothing really changed, so we should not 63 # write anything to storage and run notifications. 64 return 65 self._storage.set_atom_state(retry.name, state) 66 retry_uuid = self._storage.get_atom_uuid(retry.name) 67 details = { 68 'retry_name': retry.name, 69 'retry_uuid': retry_uuid, 70 'old_state': old_state, 71 } 72 if result is not self.NO_RESULT: 73 details['result'] = result 74 self._notifier.notify(state, details) 75 76 def schedule_execution(self, retry): 77 self.change_state(retry, states.RUNNING) 78 return self._retry_executor.execute_retry( 79 retry, self._get_retry_args(retry)) 80 81 def complete_reversion(self, retry, result): 82 if isinstance(result, failure.Failure): 83 self.change_state(retry, states.REVERT_FAILURE, result=result) 84 else: 85 self.change_state(retry, states.REVERTED, result=result) 86 87 def complete_execution(self, retry, result): 88 if isinstance(result, failure.Failure): 89 self.change_state(retry, states.FAILURE, result=result) 90 else: 91 self.change_state(retry, states.SUCCESS, result=result) 92 93 def schedule_reversion(self, retry): 94 self.change_state(retry, states.REVERTING) 95 arg_addons = { 96 retry_atom.REVERT_FLOW_FAILURES: self._storage.get_failures(), 97 } 98 return self._retry_executor.revert_retry( 99 retry, self._get_retry_args(retry, addons=arg_addons, revert=True)) 100 101 def on_failure(self, retry, atom, last_failure): 102 self._storage.save_retry_failure(retry.name, atom.name, last_failure) 103 arguments = self._get_retry_args(retry) 104 return retry.on_failure(**arguments) 105