1import re 2from collections import namedtuple 3 4from botocore.utils import merge_dicts 5 6from collections import OrderedDict 7 8FilterDef = namedtuple( 9 "FilterDef", 10 [ 11 # A list of object attributes to check against the filter values. 12 # Set to None if filter is not yet implemented in `moto`. 13 "attrs_to_check", 14 # Description of the filter, e.g. 'Object Identifiers'. 15 # Used in filter error messaging. 16 "description", 17 ], 18) 19 20 21def filters_from_querystring(querystring): 22 """Parses filters out of the query string computed by the 23 moto.core.responses.BaseResponse class. 24 25 :param dict[str, list[str]] querystring: 26 The `moto`-processed URL query string dictionary. 27 :returns: 28 Dict mapping filter names to filter values. 29 :rtype: 30 dict[str, list[str]] 31 """ 32 response_values = {} 33 for key, value in sorted(querystring.items()): 34 match = re.search(r"Filters.Filter.(\d).Name", key) 35 if match: 36 filter_index = match.groups()[0] 37 value_prefix = "Filters.Filter.{0}.Value".format(filter_index) 38 filter_values = [ 39 filter_value[0] 40 for filter_key, filter_value in querystring.items() 41 if filter_key.startswith(value_prefix) 42 ] 43 # The AWS query protocol serializes empty lists as an empty string. 44 if filter_values == [""]: 45 filter_values = [] 46 response_values[value[0]] = filter_values 47 return response_values 48 49 50def get_object_value(obj, attr): 51 """Retrieves an arbitrary attribute value from an object. 52 53 Nested attributes can be specified using dot notation, 54 e.g. 'parent.child'. 55 56 :param object obj: 57 A valid Python object. 58 :param str attr: 59 The attribute name of the value to retrieve from the object. 60 :returns: 61 The attribute value, if it exists, or None. 62 :rtype: 63 any 64 """ 65 keys = attr.split(".") 66 val = obj 67 for key in keys: 68 if hasattr(val, key): 69 val = getattr(val, key) 70 else: 71 return None 72 return val 73 74 75def merge_filters(filters_to_update, filters_to_merge): 76 """Given two groups of filters, merge the second into the first. 77 78 List values are appended instead of overwritten: 79 80 >>> merge_filters({'filter-name': ['value1']}, {'filter-name':['value2']}) 81 >>> {'filter-name': ['value1', 'value2']} 82 83 :param filters_to_update: 84 The filters to update. 85 :type filters_to_update: 86 dict[str, list] or None 87 :param filters_to_merge: 88 The filters to merge. 89 :type filters_to_merge: 90 dict[str, list] or None 91 :returns: 92 The updated filters. 93 :rtype: 94 dict[str, list] 95 """ 96 if filters_to_update is None: 97 filters_to_update = {} 98 if filters_to_merge is None: 99 filters_to_merge = {} 100 merge_dicts(filters_to_update, filters_to_merge, append_lists=True) 101 return filters_to_update 102 103 104def validate_filters(filters, filter_defs): 105 """Validates filters against a set of filter definitions. 106 107 Raises standard Python exceptions which should be caught 108 and translated to an appropriate AWS/Moto exception higher 109 up the call stack. 110 111 :param dict[str, list] filters: 112 The filters to validate. 113 :param dict[str, FilterDef] filter_defs: 114 The filter definitions to validate against. 115 :returns: None 116 :rtype: None 117 :raises KeyError: 118 if filter name not found in the filter definitions. 119 :raises ValueError: 120 if filter values is an empty list. 121 :raises NotImplementedError: 122 if `moto` does not yet support this filter. 123 """ 124 for filter_name, filter_values in filters.items(): 125 filter_def = filter_defs.get(filter_name) 126 if filter_def is None: 127 raise KeyError("Unrecognized filter name: {}".format(filter_name)) 128 if not filter_values: 129 raise ValueError( 130 "The list of {} must not be empty.".format(filter_def.description) 131 ) 132 if filter_def.attrs_to_check is None: 133 raise NotImplementedError( 134 "{} filter has not been implemented in Moto yet.".format(filter_name) 135 ) 136 137 138def apply_filter(resources, filters, filter_defs): 139 """Apply an arbitrary filter to a group of resources. 140 141 :param dict[str, object] resources: 142 A dictionary mapping resource identifiers to resource objects. 143 :param dict[str, list] filters: 144 The filters to apply. 145 :param dict[str, FilterDef] filter_defs: 146 The supported filter definitions for the resource type. 147 :returns: 148 The filtered collection of resources. 149 :rtype: 150 dict[str, object] 151 """ 152 resources_filtered = OrderedDict() 153 for identifier, obj in resources.items(): 154 matches_filter = False 155 for filter_name, filter_values in filters.items(): 156 filter_def = filter_defs.get(filter_name) 157 for attr in filter_def.attrs_to_check: 158 if get_object_value(obj, attr) in filter_values: 159 matches_filter = True 160 break 161 else: 162 matches_filter = False 163 if not matches_filter: 164 break 165 if matches_filter: 166 resources_filtered[identifier] = obj 167 return resources_filtered 168