1import re
2from collections import namedtuple
3
4from botocore.utils import merge_dicts
5
6from collections import OrderedDict
7
8FilterDef = namedtuple(
9    "FilterDef",
10    [
11        # A list of object attributes to check against the filter values.
12        # Set to None if filter is not yet implemented in `moto`.
13        "attrs_to_check",
14        # Description of the filter, e.g. 'Object Identifiers'.
15        # Used in filter error messaging.
16        "description",
17    ],
18)
19
20
21def filters_from_querystring(querystring):
22    """Parses filters out of the query string computed by the
23    moto.core.responses.BaseResponse class.
24
25    :param dict[str, list[str]] querystring:
26        The `moto`-processed URL query string dictionary.
27    :returns:
28        Dict mapping filter names to filter values.
29    :rtype:
30        dict[str, list[str]]
31    """
32    response_values = {}
33    for key, value in sorted(querystring.items()):
34        match = re.search(r"Filters.Filter.(\d).Name", key)
35        if match:
36            filter_index = match.groups()[0]
37            value_prefix = "Filters.Filter.{0}.Value".format(filter_index)
38            filter_values = [
39                filter_value[0]
40                for filter_key, filter_value in querystring.items()
41                if filter_key.startswith(value_prefix)
42            ]
43            # The AWS query protocol serializes empty lists as an empty string.
44            if filter_values == [""]:
45                filter_values = []
46            response_values[value[0]] = filter_values
47    return response_values
48
49
50def get_object_value(obj, attr):
51    """Retrieves an arbitrary attribute value from an object.
52
53    Nested attributes can be specified using dot notation,
54    e.g. 'parent.child'.
55
56    :param object obj:
57        A valid Python object.
58    :param str attr:
59        The attribute name of the value to retrieve from the object.
60    :returns:
61        The attribute value, if it exists, or None.
62    :rtype:
63        any
64    """
65    keys = attr.split(".")
66    val = obj
67    for key in keys:
68        if hasattr(val, key):
69            val = getattr(val, key)
70        else:
71            return None
72    return val
73
74
75def merge_filters(filters_to_update, filters_to_merge):
76    """Given two groups of filters, merge the second into the first.
77
78    List values are appended instead of overwritten:
79
80    >>> merge_filters({'filter-name': ['value1']}, {'filter-name':['value2']})
81    >>> {'filter-name': ['value1', 'value2']}
82
83    :param filters_to_update:
84        The filters to update.
85    :type filters_to_update:
86        dict[str, list] or None
87    :param filters_to_merge:
88        The filters to merge.
89    :type filters_to_merge:
90        dict[str, list] or None
91    :returns:
92        The updated filters.
93    :rtype:
94        dict[str, list]
95    """
96    if filters_to_update is None:
97        filters_to_update = {}
98    if filters_to_merge is None:
99        filters_to_merge = {}
100    merge_dicts(filters_to_update, filters_to_merge, append_lists=True)
101    return filters_to_update
102
103
104def validate_filters(filters, filter_defs):
105    """Validates filters against a set of filter definitions.
106
107    Raises standard Python exceptions which should be caught
108    and translated to an appropriate AWS/Moto exception higher
109    up the call stack.
110
111    :param dict[str, list] filters:
112        The filters to validate.
113    :param dict[str, FilterDef] filter_defs:
114        The filter definitions to validate against.
115    :returns: None
116    :rtype: None
117    :raises KeyError:
118        if filter name not found in the filter definitions.
119    :raises ValueError:
120        if filter values is an empty list.
121    :raises NotImplementedError:
122        if `moto` does not yet support this filter.
123    """
124    for filter_name, filter_values in filters.items():
125        filter_def = filter_defs.get(filter_name)
126        if filter_def is None:
127            raise KeyError("Unrecognized filter name: {}".format(filter_name))
128        if not filter_values:
129            raise ValueError(
130                "The list of {} must not be empty.".format(filter_def.description)
131            )
132        if filter_def.attrs_to_check is None:
133            raise NotImplementedError(
134                "{} filter has not been implemented in Moto yet.".format(filter_name)
135            )
136
137
138def apply_filter(resources, filters, filter_defs):
139    """Apply an arbitrary filter to a group of resources.
140
141    :param dict[str, object] resources:
142        A dictionary mapping resource identifiers to resource objects.
143    :param dict[str, list] filters:
144        The filters to apply.
145    :param dict[str, FilterDef] filter_defs:
146        The supported filter definitions for the resource type.
147    :returns:
148        The filtered collection of resources.
149    :rtype:
150        dict[str, object]
151    """
152    resources_filtered = OrderedDict()
153    for identifier, obj in resources.items():
154        matches_filter = False
155        for filter_name, filter_values in filters.items():
156            filter_def = filter_defs.get(filter_name)
157            for attr in filter_def.attrs_to_check:
158                if get_object_value(obj, attr) in filter_values:
159                    matches_filter = True
160                    break
161            else:
162                matches_filter = False
163            if not matches_filter:
164                break
165        if matches_filter:
166            resources_filtered[identifier] = obj
167    return resources_filtered
168