1# -*- coding: utf-8 -*-
2
3#    Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17from taskflow.engines.action_engine.actions import base
18from taskflow import retry as retry_atom
19from taskflow import states
20from taskflow.types import failure
21
22
23class RetryAction(base.Action):
24    """An action that handles executing, state changes, ... of retry atoms."""
25
26    def __init__(self, storage, notifier, retry_executor):
27        super(RetryAction, self).__init__(storage, notifier)
28        self._retry_executor = retry_executor
29
30    def _get_retry_args(self, retry, revert=False, addons=None):
31        if revert:
32            arguments = self._storage.fetch_mapped_args(
33                retry.revert_rebind,
34                atom_name=retry.name,
35                optional_args=retry.revert_optional
36            )
37        else:
38            arguments = self._storage.fetch_mapped_args(
39                retry.rebind,
40                atom_name=retry.name,
41                optional_args=retry.optional
42            )
43        history = self._storage.get_retry_history(retry.name)
44        arguments[retry_atom.EXECUTE_REVERT_HISTORY] = history
45        if addons:
46            arguments.update(addons)
47        return arguments
48
49    def change_state(self, retry, state, result=base.Action.NO_RESULT):
50        old_state = self._storage.get_atom_state(retry.name)
51        if state in self.SAVE_RESULT_STATES:
52            save_result = None
53            if result is not self.NO_RESULT:
54                save_result = result
55            self._storage.save(retry.name, save_result, state)
56            # TODO(harlowja): combine this with the save to avoid a call
57            # back into the persistence layer...
58            if state == states.REVERTED:
59                self._storage.cleanup_retry_history(retry.name, state)
60        else:
61            if state == old_state:
62                # NOTE(imelnikov): nothing really changed, so we should not
63                # write anything to storage and run notifications.
64                return
65            self._storage.set_atom_state(retry.name, state)
66        retry_uuid = self._storage.get_atom_uuid(retry.name)
67        details = {
68            'retry_name': retry.name,
69            'retry_uuid': retry_uuid,
70            'old_state': old_state,
71        }
72        if result is not self.NO_RESULT:
73            details['result'] = result
74        self._notifier.notify(state, details)
75
76    def schedule_execution(self, retry):
77        self.change_state(retry, states.RUNNING)
78        return self._retry_executor.execute_retry(
79            retry, self._get_retry_args(retry))
80
81    def complete_reversion(self, retry, result):
82        if isinstance(result, failure.Failure):
83            self.change_state(retry, states.REVERT_FAILURE, result=result)
84        else:
85            self.change_state(retry, states.REVERTED, result=result)
86
87    def complete_execution(self, retry, result):
88        if isinstance(result, failure.Failure):
89            self.change_state(retry, states.FAILURE, result=result)
90        else:
91            self.change_state(retry, states.SUCCESS, result=result)
92
93    def schedule_reversion(self, retry):
94        self.change_state(retry, states.REVERTING)
95        arg_addons = {
96            retry_atom.REVERT_FLOW_FAILURES: self._storage.get_failures(),
97        }
98        return self._retry_executor.revert_retry(
99            retry, self._get_retry_args(retry, addons=arg_addons, revert=True))
100
101    def on_failure(self, retry, atom, last_failure):
102        self._storage.save_retry_failure(retry.name, atom.name, last_failure)
103        arguments = self._get_retry_args(retry)
104        return retry.on_failure(**arguments)
105