1"""Utility functions for STIX2 data markings.""" 2 3import collections 4 5import six 6 7from stix2 import exceptions, utils 8 9 10def _evaluate_expression(obj, selector): 11 """Walk an SDO or SRO generating selectors to match against ``selector``. 12 13 If a match is found and the the value of this property is present in the 14 objects. Matching value of the property will be returned. 15 16 Args: 17 obj: An SDO or SRO object. 18 selector (str): A string following the selector syntax. 19 20 Returns: 21 list: Values contained in matching property. Otherwise empty list. 22 23 """ 24 for items, value in iterpath(obj): 25 path = '.'.join(items) 26 27 if path == selector and value: 28 return [value] 29 30 return [] 31 32 33def _validate_selector(obj, selector): 34 """Evaluate each selector against an object.""" 35 results = list(_evaluate_expression(obj, selector)) 36 37 if len(results) >= 1: 38 return True 39 40 41def _get_marking_id(marking): 42 if type(marking).__name__ == 'MarkingDefinition': # avoid circular import 43 return marking.id 44 return marking 45 46 47def validate(obj, selectors): 48 """Given an SDO or SRO, check that each selector is valid.""" 49 if selectors: 50 for s in selectors: 51 if not _validate_selector(obj, s): 52 raise exceptions.InvalidSelectorError(obj, s) 53 return 54 55 raise exceptions.InvalidSelectorError(obj, selectors) 56 57 58def convert_to_list(data): 59 """Convert input into a list for further processing.""" 60 if data is not None: 61 if isinstance(data, list): 62 return data 63 else: 64 return [data] 65 66 67def convert_to_marking_list(data): 68 """Convert input into a list of marking identifiers.""" 69 if data is not None: 70 if isinstance(data, list): 71 return [_get_marking_id(x) for x in data] 72 else: 73 return [_get_marking_id(data)] 74 75 76def compress_markings(granular_markings): 77 """Compress granular markings list. 78 79 If there is more than one marking identifier matches. It will collapse into 80 a single granular marking. 81 82 Example: 83 >>> compress_markings([ 84 ... { 85 ... "selectors": [ 86 ... "description" 87 ... ], 88 ... "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 89 ... }, 90 ... { 91 ... "selectors": [ 92 ... "name" 93 ... ], 94 ... "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 95 ... } 96 ... ]) 97 [ 98 { 99 "selectors": [ 100 "description", 101 "name" 102 ], 103 "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 104 } 105 ] 106 107 Args: 108 granular_markings: The granular markings list property present in a 109 SDO or SRO. 110 111 Returns: 112 list: A list with all markings collapsed. 113 114 """ 115 if not granular_markings: 116 return 117 118 map_ = collections.defaultdict(set) 119 120 for granular_marking in granular_markings: 121 if granular_marking.get('marking_ref'): 122 map_[granular_marking.get('marking_ref')].update(granular_marking.get('selectors')) 123 124 if granular_marking.get('lang'): 125 map_[granular_marking.get('lang')].update(granular_marking.get('selectors')) 126 127 compressed = \ 128 [ 129 {'marking_ref': item, 'selectors': sorted(selectors)} 130 if utils.is_marking(item) else 131 {'lang': item, 'selectors': sorted(selectors)} 132 for item, selectors in six.iteritems(map_) 133 ] 134 135 return compressed 136 137 138def expand_markings(granular_markings): 139 """Expand granular markings list. 140 141 If there is more than one selector per granular marking. It will be 142 expanded using the same marking_ref. 143 144 Example: 145 >>> expand_markings([ 146 ... { 147 ... "selectors": [ 148 ... "description", 149 ... "name" 150 ... ], 151 ... "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 152 ... } 153 ... ]) 154 [ 155 { 156 "selectors": [ 157 "description" 158 ], 159 "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 160 }, 161 { 162 "selectors": [ 163 "name" 164 ], 165 "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" 166 } 167 ] 168 169 Args: 170 granular_markings: The granular markings list property present in a 171 SDO or SRO. 172 173 Returns: 174 list: A list with all markings expanded. 175 176 """ 177 expanded = [] 178 179 for marking in granular_markings: 180 selectors = marking.get('selectors') 181 marking_ref = marking.get('marking_ref') 182 lang = marking.get('lang') 183 184 if marking_ref: 185 expanded.extend( 186 [ 187 {'marking_ref': marking_ref, 'selectors': [selector]} 188 for selector in selectors 189 ], 190 ) 191 if lang: 192 expanded.extend( 193 [ 194 {'lang': lang, 'selectors': [selector]} 195 for selector in selectors 196 ], 197 ) 198 199 return expanded 200 201 202def build_granular_marking(granular_marking): 203 """Return a dictionary with the required structure for a granular marking. 204 """ 205 return {'granular_markings': expand_markings(granular_marking)} 206 207 208def iterpath(obj, path=None): 209 """Generator which walks the input ``obj`` model. 210 211 Each iteration yields a tuple containing a list of ancestors and the 212 property value. 213 214 Args: 215 obj: An SDO or SRO object. 216 path: None, used recursively to store ancestors. 217 218 Example: 219 >>> for item in iterpath(obj): 220 >>> print(item) 221 (['type'], 'campaign') 222 ... 223 (['cybox', 'objects', '[0]', 'hashes', 'sha1'], 'cac35ec206d868b7d7cb0b55f31d9425b075082b') 224 225 Returns: 226 tuple: Containing two items: a list of ancestors and the 227 property value. 228 229 """ 230 if path is None: 231 path = [] 232 233 for varname, varobj in iter(sorted(six.iteritems(obj))): 234 path.append(varname) 235 yield (path, varobj) 236 237 if isinstance(varobj, dict): 238 239 for item in iterpath(varobj, path): 240 yield item 241 242 elif isinstance(varobj, list): 243 244 for item in varobj: 245 index = '[{0}]'.format(varobj.index(item)) 246 path.append(index) 247 248 yield (path, item) 249 250 if isinstance(item, dict): 251 for descendant in iterpath(item, path): 252 yield descendant 253 254 path.pop() 255 256 path.pop() 257 258 259def check_tlp_marking(marking_obj, spec_version): 260 # Specific TLP Marking validation case. 261 262 if marking_obj["definition_type"] == "tlp": 263 color = marking_obj["definition"]["tlp"] 264 265 if color == "white": 266 if spec_version == '2.0': 267 w = ( 268 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "white"}, "definition_type": "tlp",' 269 ' "id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", "type": "marking-definition"}' 270 ) 271 else: 272 w = ( 273 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "white"}, "definition_type": "tlp",' 274 ' "id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", "name": "TLP:WHITE",' 275 ' "type": "marking-definition", "spec_version": "2.1"}' 276 ) 277 if marking_obj["id"] != "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9": 278 raise exceptions.TLPMarkingDefinitionError(marking_obj["id"], w) 279 elif utils.format_datetime(marking_obj["created"]) != "2017-01-20T00:00:00.000Z": 280 raise exceptions.TLPMarkingDefinitionError(utils.format_datetime(marking_obj["created"]), w) 281 282 elif color == "green": 283 if spec_version == '2.0': 284 g = ( 285 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "green"}, "definition_type": "tlp",' 286 ' "id": "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da", "type": "marking-definition"}' 287 ) 288 else: 289 g = ( 290 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "green"}, "definition_type": "tlp",' 291 ' "id": "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da", "name": "TLP:GREEN",' 292 ' "type": "marking-definition", "spec_version": "2.1"}' 293 ) 294 if marking_obj["id"] != "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da": 295 raise exceptions.TLPMarkingDefinitionError(marking_obj["id"], g) 296 elif utils.format_datetime(marking_obj["created"]) != "2017-01-20T00:00:00.000Z": 297 raise exceptions.TLPMarkingDefinitionError(utils.format_datetime(marking_obj["created"]), g) 298 299 elif color == "amber": 300 if spec_version == '2.0': 301 a = ( 302 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "amber"}, "definition_type": "tlp",' 303 ' "id": "marking-definition--f88d31f6-486f-44da-b317-01333bde0b82", "type": "marking-definition"}' 304 ) 305 else: 306 a = ( 307 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "amber"}, "definition_type": "tlp",' 308 ' "id": "marking-definition--f88d31f6-486f-44da-b317-01333bde0b82", "name": "TLP:AMBER",' 309 ' "type": "marking-definition", "spec_version": "2.1"}' 310 ) 311 if marking_obj["id"] != "marking-definition--f88d31f6-486f-44da-b317-01333bde0b82": 312 raise exceptions.TLPMarkingDefinitionError(marking_obj["id"], a) 313 elif utils.format_datetime(marking_obj["created"]) != "2017-01-20T00:00:00.000Z": 314 raise exceptions.TLPMarkingDefinitionError(utils.format_datetime(marking_obj["created"]), a) 315 316 elif color == "red": 317 if spec_version == '2.0': 318 r = ( 319 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "red"}, "definition_type": "tlp",' 320 ' "id": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", "type": "marking-definition"}' 321 ) 322 else: 323 r = ( 324 '{"created": "2017-01-20T00:00:00.000Z", "definition": {"tlp": "red"}, "definition_type": "tlp",' 325 ' "id": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", "name": "TLP:RED",' 326 ' "type": "marking-definition", "spec_version": "2.1"}' 327 ) 328 if marking_obj["id"] != "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed": 329 raise exceptions.TLPMarkingDefinitionError(marking_obj["id"], r) 330 elif utils.format_datetime(marking_obj["created"]) != "2017-01-20T00:00:00.000Z": 331 raise exceptions.TLPMarkingDefinitionError(utils.format_datetime(marking_obj["created"]), r) 332 333 else: 334 raise exceptions.TLPMarkingDefinitionError(marking_obj["id"], "Does not match any TLP Marking definition") 335