1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2015 OpenStack Foundation.
4# All Rights Reserved.
5#
6#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7#    not use this file except in compliance with the License. You may obtain
8#    a copy of the License at
9#
10#         http://www.apache.org/licenses/LICENSE-2.0
11#
12#    Unless required by applicable law or agreed to in writing, software
13#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15#    License for the specific language governing permissions and limitations
16#    under the License.
17
18import abc
19import ast
20import inspect
21
22import stevedore
23
24registered_checks = {}
25extension_checks = None
26
27
28def get_extensions():
29    global extension_checks
30    if extension_checks is None:
31        em = stevedore.ExtensionManager('oslo.policy.rule_checks',
32                                        invoke_on_load=False)
33        extension_checks = {
34            extension.name: extension.plugin
35            for extension in em
36        }
37    return extension_checks
38
39
40def _check(rule, target, creds, enforcer, current_rule):
41    """Evaluate the rule.
42
43    This private method is meant to be used by the enforcer to call
44    the rule. It can also be used by built-in checks that have nested
45    rules.
46
47    We use a private function because it makes it easier to change the
48    API without having an impact on subclasses not defined within the
49    oslo.policy library.
50
51    We don't put this logic in Enforcer.enforce() and invoke that
52    method recursively because that changes the BaseCheck API to
53    require that the enforcer argument to __call__() be a valid
54    Enforcer instance (as evidenced by all of the breaking unit
55    tests).
56
57    We don't put this in a private method of BaseCheck because that
58    propagates the problem of extending the list of arguments to
59    __call__() if subclasses change the implementation of the
60    function.
61
62    :param rule: A check object.
63    :type rule: BaseCheck
64    :param target: Attributes of the object of the operation.
65    :type target: dict
66    :param creds: Attributes of the user performing the operation.
67    :type creds: dict
68    :param enforcer: The Enforcer being used.
69    :type enforcer: Enforcer
70    :param current_rule: The name of the policy being checked.
71    :type current_rule: str
72
73    """
74    # Evaluate the rule
75    argspec = inspect.getfullargspec(rule.__call__)
76    rule_args = [target, creds, enforcer]
77    # Check if the rule argument must be included or not
78    if len(argspec.args) > 4:
79        rule_args.append(current_rule)
80    return rule(*rule_args)
81
82
83class BaseCheck(metaclass=abc.ABCMeta):
84    """Abstract base class for Check classes."""
85
86    scope_types = None
87
88    @abc.abstractmethod
89    def __str__(self):
90        """String representation of the Check tree rooted at this node."""
91
92        pass
93
94    @abc.abstractmethod
95    def __call__(self, target, cred, enforcer, current_rule=None):
96        """Triggers if instance of the class is called.
97
98        Performs the check. Returns False to reject the access or a
99        true value (not necessary True) to accept the access.
100        """
101
102        pass
103
104
105class FalseCheck(BaseCheck):
106    """A policy check that always returns ``False`` (disallow)."""
107
108    def __str__(self):
109        """Return a string representation of this check."""
110
111        return '!'
112
113    def __call__(self, target, cred, enforcer, current_rule=None):
114        """Check the policy."""
115
116        return False
117
118
119class TrueCheck(BaseCheck):
120    """A policy check that always returns ``True`` (allow)."""
121
122    def __str__(self):
123        """Return a string representation of this check."""
124
125        return '@'
126
127    def __call__(self, target, cred, enforcer, current_rule=None):
128        """Check the policy."""
129
130        return True
131
132
133class Check(BaseCheck):
134    def __init__(self, kind, match):
135        self.kind = kind
136        self.match = match
137
138    def __str__(self):
139        """Return a string representation of this check."""
140
141        return '%s:%s' % (self.kind, self.match)
142
143
144class NotCheck(BaseCheck):
145    def __init__(self, rule):
146        self.rule = rule
147
148    def __str__(self):
149        """Return a string representation of this check."""
150
151        return 'not %s' % self.rule
152
153    def __call__(self, target, cred, enforcer, current_rule=None):
154        """Check the policy.
155
156        Returns the logical inverse of the wrapped check.
157        """
158
159        return not _check(self.rule, target, cred, enforcer, current_rule)
160
161
162class AndCheck(BaseCheck):
163    def __init__(self, rules):
164        self.rules = rules
165
166    def __str__(self):
167        """Return a string representation of this check."""
168
169        return '(%s)' % ' and '.join(str(r) for r in self.rules)
170
171    def __call__(self, target, cred, enforcer, current_rule=None):
172        """Check the policy.
173
174        Requires that all rules accept in order to return True.
175        """
176
177        for rule in self.rules:
178            if not _check(rule, target, cred, enforcer, current_rule):
179                return False
180
181        return True
182
183    def add_check(self, rule):
184        """Adds rule to be tested.
185
186        Allows addition of another rule to the list of rules that will
187        be tested.
188
189        :returns: self
190        :rtype: :class:`.AndCheck`
191        """
192
193        self.rules.append(rule)
194        return self
195
196
197class OrCheck(BaseCheck):
198    def __init__(self, rules):
199        self.rules = rules
200
201    def __str__(self):
202        """Return a string representation of this check."""
203
204        return '(%s)' % ' or '.join(str(r) for r in self.rules)
205
206    def __call__(self, target, cred, enforcer, current_rule=None):
207        """Check the policy.
208
209        Requires that at least one rule accept in order to return True.
210        """
211
212        for rule in self.rules:
213            if _check(rule, target, cred, enforcer, current_rule):
214                return True
215        return False
216
217    def add_check(self, rule):
218        """Adds rule to be tested.
219
220        Allows addition of another rule to the list of rules that will
221        be tested.  Returns the OrCheck object for convenience.
222        """
223
224        self.rules.append(rule)
225        return self
226
227    def pop_check(self):
228        """Pops the last check from the list and returns them
229
230        :returns: self, the popped check
231        :rtype: :class:`.OrCheck`, class:`.Check`
232        """
233
234        check = self.rules.pop()
235        return self, check
236
237
238def register(name, func=None):
239    # Perform the actual decoration by registering the function or
240    # class.  Returns the function or class for compliance with the
241    # decorator interface.
242    def decorator(func):
243        registered_checks[name] = func
244        return func
245
246    # If the function or class is given, do the registration
247    if func:
248        return decorator(func)
249
250    return decorator
251
252
253@register('rule')
254class RuleCheck(Check):
255    def __call__(self, target, creds, enforcer, current_rule=None):
256        try:
257            return _check(
258                rule=enforcer.rules[self.match],
259                target=target,
260                creds=creds,
261                enforcer=enforcer,
262                current_rule=current_rule,
263            )
264        except KeyError:
265            # We don't have any matching rule; fail closed
266            return False
267
268
269@register('role')
270class RoleCheck(Check):
271    """Check that there is a matching role in the ``creds`` dict."""
272
273    def __call__(self, target, creds, enforcer, current_rule=None):
274        try:
275            match = self.match % target
276        except KeyError:
277            # While doing RoleCheck if key not
278            # present in Target return false
279            return False
280        if 'roles' in creds:
281            return match.lower() in [x.lower() for x in creds['roles']]
282        return False
283
284
285@register(None)
286class GenericCheck(Check):
287    """Check an individual match.
288
289    Matches look like:
290
291        - tenant:%(tenant_id)s
292        - role:compute:admin
293        - True:%(user.enabled)s
294        - 'Member':%(role.name)s
295    """
296
297    def _find_in_dict(self, test_value, path_segments, match):
298        '''Searches for a match in the dictionary.
299
300        test_value is a reference inside the dictionary. Since the process is
301        recursive, each call to _find_in_dict will be one level deeper.
302
303        path_segments is the segments of the path to search.  The recursion
304        ends when there are no more segments of path.
305
306        When specifying a value inside a list, each element of the list is
307        checked for a match. If the value is found within any of the sub lists
308        the check succeeds; The check only fails if the entry is not in any of
309        the sublists.
310
311        '''
312
313        if len(path_segments) == 0:
314            return match == str(test_value)
315        key, path_segments = path_segments[0], path_segments[1:]
316        try:
317            test_value = test_value[key]
318        except KeyError:
319            return False
320        if isinstance(test_value, list):
321            for val in test_value:
322                if self._find_in_dict(val, path_segments, match):
323                    return True
324            return False
325        else:
326            return self._find_in_dict(test_value, path_segments, match)
327
328    def __call__(self, target, creds, enforcer, current_rule=None):
329
330        try:
331            match = self.match % target
332        except KeyError:
333            # While doing GenericCheck if key not
334            # present in Target return false
335            return False
336        try:
337            # Try to interpret self.kind as a literal
338            test_value = ast.literal_eval(self.kind)
339            return match == str(test_value)
340
341        except ValueError:
342            pass
343
344        path_segments = self.kind.split('.')
345        return self._find_in_dict(creds, path_segments, match)
346