1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17import abc 18 19from oslo_utils import reflection 20import six 21 22# Link metadata keys that have inherent/special meaning. 23# 24# This key denotes the link is an invariant that ensures the order is 25# correctly preserved. 26LINK_INVARIANT = 'invariant' 27# This key denotes the link is a manually/user-specified. 28LINK_MANUAL = 'manual' 29# This key denotes the link was created when resolving/compiling retries. 30LINK_RETRY = 'retry' 31# This key denotes the link was created due to symbol constraints and the 32# value will be a set of names that the constraint ensures are satisfied. 33LINK_REASONS = 'reasons' 34# 35# This key denotes a callable that will determine if the target is visited. 36LINK_DECIDER = 'decider' 37 38# Chop off full module names of patterns that are built-in to taskflow... 39_CHOP_PAT = "taskflow.patterns." 40_CHOP_PAT_LEN = len(_CHOP_PAT) 41 42# This key denotes the depth the decider will apply (defaulting to all). 43LINK_DECIDER_DEPTH = 'decider_depth' 44 45 46@six.add_metaclass(abc.ABCMeta) 47class Flow(object): 48 """The base abstract class of all flow implementations. 49 50 A flow is a structure that defines relationships between tasks. You can 51 add tasks and other flows (as subflows) to the flow, and the flow provides 52 a way to implicitly or explicitly define how they are interdependent. 53 Exact structure of the relationships is defined by concrete 54 implementation, while this class defines common interface and adds 55 human-readable (not necessary unique) name. 56 57 NOTE(harlowja): if a flow is placed in another flow as a subflow, a desired 58 way to compose flows together, then it is valid and permissible that during 59 compilation the subflow & parent flow *may* be flattened into a new flow. 60 """ 61 62 def __init__(self, name, retry=None): 63 self._name = six.text_type(name) 64 self._retry = retry 65 # NOTE(akarpinska): if retry doesn't have a name, 66 # the name of its owner will be assigned 67 if self._retry is not None and self._retry.name is None: 68 self._retry.name = self.name + "_retry" 69 70 @property 71 def name(self): 72 """A non-unique name for this flow (human readable).""" 73 return self._name 74 75 @property 76 def retry(self): 77 """The associated flow retry controller. 78 79 This retry controller object will affect & control how (and if) this 80 flow and its contained components retry when execution is underway and 81 a failure occurs. 82 """ 83 return self._retry 84 85 @abc.abstractmethod 86 def add(self, *items): 87 """Adds a given item/items to this flow.""" 88 89 @abc.abstractmethod 90 def __len__(self): 91 """Returns how many items are in this flow.""" 92 93 @abc.abstractmethod 94 def __iter__(self): 95 """Iterates over the children of the flow.""" 96 97 @abc.abstractmethod 98 def iter_links(self): 99 """Iterates over dependency links between children of the flow. 100 101 Iterates over 3-tuples ``(A, B, meta)``, where 102 * ``A`` is a child (atom or subflow) link starts from; 103 * ``B`` is a child (atom or subflow) link points to; it is 104 said that ``B`` depends on ``A`` or ``B`` requires ``A``; 105 * ``meta`` is link metadata, a dictionary. 106 """ 107 108 @abc.abstractmethod 109 def iter_nodes(self): 110 """Iterate over nodes of the flow. 111 112 Iterates over 2-tuples ``(A, meta)``, where 113 * ``A`` is a child (atom or subflow) of current flow; 114 * ``meta`` is link metadata, a dictionary. 115 """ 116 117 def __str__(self): 118 cls_name = reflection.get_class_name(self) 119 if cls_name.startswith(_CHOP_PAT): 120 cls_name = cls_name[_CHOP_PAT_LEN:] 121 return "%s: %s(len=%d)" % (cls_name, self.name, len(self)) 122 123 @property 124 def provides(self): 125 """Set of symbol names provided by the flow.""" 126 provides = set() 127 if self._retry is not None: 128 provides.update(self._retry.provides) 129 for item in self: 130 provides.update(item.provides) 131 return frozenset(provides) 132 133 @abc.abstractproperty 134 def requires(self): 135 """Set of *unsatisfied* symbol names required by the flow.""" 136