1import json 2import re 3 4from moto.iam.exceptions import MalformedPolicyDocument 5 6 7VALID_TOP_ELEMENTS = ["Version", "Id", "Statement", "Conditions"] 8 9VALID_VERSIONS = ["2008-10-17", "2012-10-17"] 10 11VALID_STATEMENT_ELEMENTS = [ 12 "Sid", 13 "Action", 14 "NotAction", 15 "Resource", 16 "NotResource", 17 "Effect", 18 "Principal", 19 "NotPrincipal", 20 "Condition", 21] 22 23VALID_EFFECTS = ["Allow", "Deny"] 24 25VALID_CONDITIONS = [ 26 "StringEquals", 27 "StringNotEquals", 28 "StringEqualsIgnoreCase", 29 "StringNotEqualsIgnoreCase", 30 "StringLike", 31 "StringNotLike", 32 "NumericEquals", 33 "NumericNotEquals", 34 "NumericLessThan", 35 "NumericLessThanEquals", 36 "NumericGreaterThan", 37 "NumericGreaterThanEquals", 38 "DateEquals", 39 "DateNotEquals", 40 "DateLessThan", 41 "DateLessThanEquals", 42 "DateGreaterThan", 43 "DateGreaterThanEquals", 44 "Bool", 45 "BinaryEquals", 46 "IpAddress", 47 "NotIpAddress", 48 "ArnEquals", 49 "ArnLike", 50 "ArnNotEquals", 51 "ArnNotLike", 52 "Null", 53] 54 55VALID_CONDITION_PREFIXES = ["ForAnyValue:", "ForAllValues:"] 56 57VALID_CONDITION_POSTFIXES = ["IfExists"] 58 59SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = { 60 "iam": "IAM resource {resource} cannot contain region information.", 61 "s3": "Resource {resource} can not contain region information.", 62} 63 64VALID_RESOURCE_PATH_STARTING_VALUES = { 65 "iam": { 66 "values": [ 67 "user/", 68 "federated-user/", 69 "role/", 70 "group/", 71 "instance-profile/", 72 "mfa/", 73 "server-certificate/", 74 "policy/", 75 "sms-mfa/", 76 "saml-provider/", 77 "oidc-provider/", 78 "report/", 79 "access-report/", 80 ], 81 "error_message": 'IAM resource path must either be "*" or start with {values}.', 82 } 83} 84 85 86class IAMPolicyDocumentValidator: 87 def __init__(self, policy_document): 88 self._policy_document = policy_document 89 self._policy_json = {} 90 self._statements = [] 91 self._resource_error = "" # the first resource error found that does not generate a legacy parsing error 92 93 def validate(self): 94 try: 95 self._validate_syntax() 96 except Exception: 97 raise MalformedPolicyDocument("Syntax errors in policy.") 98 try: 99 self._validate_version() 100 except Exception: 101 raise MalformedPolicyDocument( 102 "Policy document must be version 2012-10-17 or greater." 103 ) 104 try: 105 self._perform_first_legacy_parsing() 106 self._validate_resources_for_formats() 107 self._validate_not_resources_for_formats() 108 except Exception: 109 raise MalformedPolicyDocument("The policy failed legacy parsing") 110 try: 111 self._validate_sid_uniqueness() 112 except Exception: 113 raise MalformedPolicyDocument( 114 "Statement IDs (SID) in a single policy must be unique." 115 ) 116 try: 117 self._validate_action_like_exist() 118 except Exception: 119 raise MalformedPolicyDocument("Policy statement must contain actions.") 120 try: 121 self._validate_resource_exist() 122 except Exception: 123 raise MalformedPolicyDocument("Policy statement must contain resources.") 124 125 if self._resource_error != "": 126 raise MalformedPolicyDocument(self._resource_error) 127 128 self._validate_actions_for_prefixes() 129 self._validate_not_actions_for_prefixes() 130 131 def _validate_syntax(self): 132 self._policy_json = json.loads(self._policy_document) 133 assert isinstance(self._policy_json, dict) 134 self._validate_top_elements() 135 self._validate_version_syntax() 136 self._validate_id_syntax() 137 self._validate_statements_syntax() 138 139 def _validate_top_elements(self): 140 top_elements = self._policy_json.keys() 141 for element in top_elements: 142 assert element in VALID_TOP_ELEMENTS 143 144 def _validate_version_syntax(self): 145 if "Version" in self._policy_json: 146 assert self._policy_json["Version"] in VALID_VERSIONS 147 148 def _validate_version(self): 149 assert self._policy_json["Version"] == "2012-10-17" 150 151 def _validate_sid_uniqueness(self): 152 sids = [] 153 for statement in self._statements: 154 if "Sid" in statement: 155 statementId = statement["Sid"] 156 if statementId: 157 assert statementId not in sids 158 sids.append(statementId) 159 160 def _validate_statements_syntax(self): 161 assert "Statement" in self._policy_json 162 assert isinstance(self._policy_json["Statement"], (dict, list)) 163 164 if isinstance(self._policy_json["Statement"], dict): 165 self._statements.append(self._policy_json["Statement"]) 166 else: 167 self._statements += self._policy_json["Statement"] 168 169 assert self._statements 170 for statement in self._statements: 171 self._validate_statement_syntax(statement) 172 173 @staticmethod 174 def _validate_statement_syntax(statement): 175 assert isinstance(statement, dict) 176 for statement_element in statement.keys(): 177 assert statement_element in VALID_STATEMENT_ELEMENTS 178 179 assert "Resource" not in statement or "NotResource" not in statement 180 assert "Action" not in statement or "NotAction" not in statement 181 182 IAMPolicyDocumentValidator._validate_effect_syntax(statement) 183 IAMPolicyDocumentValidator._validate_action_syntax(statement) 184 IAMPolicyDocumentValidator._validate_not_action_syntax(statement) 185 IAMPolicyDocumentValidator._validate_resource_syntax(statement) 186 IAMPolicyDocumentValidator._validate_not_resource_syntax(statement) 187 IAMPolicyDocumentValidator._validate_condition_syntax(statement) 188 IAMPolicyDocumentValidator._validate_sid_syntax(statement) 189 190 @staticmethod 191 def _validate_effect_syntax(statement): 192 assert "Effect" in statement 193 assert isinstance(statement["Effect"], str) 194 assert statement["Effect"].lower() in [ 195 allowed_effect.lower() for allowed_effect in VALID_EFFECTS 196 ] 197 198 @staticmethod 199 def _validate_action_syntax(statement): 200 IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax( 201 statement, "Action" 202 ) 203 204 @staticmethod 205 def _validate_not_action_syntax(statement): 206 IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax( 207 statement, "NotAction" 208 ) 209 210 @staticmethod 211 def _validate_resource_syntax(statement): 212 IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax( 213 statement, "Resource" 214 ) 215 216 @staticmethod 217 def _validate_not_resource_syntax(statement): 218 IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax( 219 statement, "NotResource" 220 ) 221 222 @staticmethod 223 def _validate_string_or_list_of_strings_syntax(statement, key): 224 if key in statement: 225 assert isinstance(statement[key], (str, list)) 226 if isinstance(statement[key], list): 227 for resource in statement[key]: 228 assert isinstance(resource, str) 229 230 @staticmethod 231 def _validate_condition_syntax(statement): 232 if "Condition" in statement: 233 assert isinstance(statement["Condition"], dict) 234 for condition_key, condition_value in statement["Condition"].items(): 235 assert isinstance(condition_value, dict) 236 for ( 237 condition_element_key, 238 condition_element_value, 239 ) in condition_value.items(): 240 assert isinstance(condition_element_value, (list, str)) 241 242 if ( 243 IAMPolicyDocumentValidator._strip_condition_key(condition_key) 244 not in VALID_CONDITIONS 245 ): 246 assert not condition_value # empty dict 247 248 @staticmethod 249 def _strip_condition_key(condition_key): 250 for valid_prefix in VALID_CONDITION_PREFIXES: 251 if condition_key.startswith(valid_prefix): 252 condition_key = condition_key[len(valid_prefix) :] 253 break # strip only the first match 254 255 for valid_postfix in VALID_CONDITION_POSTFIXES: 256 if condition_key.endswith(valid_postfix): 257 condition_key = condition_key[: -len(valid_postfix)] 258 break # strip only the first match 259 260 return condition_key 261 262 @staticmethod 263 def _validate_sid_syntax(statement): 264 if "Sid" in statement: 265 assert isinstance(statement["Sid"], str) 266 267 def _validate_id_syntax(self): 268 if "Id" in self._policy_json: 269 assert isinstance(self._policy_json["Id"], str) 270 271 def _validate_resource_exist(self): 272 for statement in self._statements: 273 assert "Resource" in statement or "NotResource" in statement 274 if "Resource" in statement and isinstance(statement["Resource"], list): 275 assert statement["Resource"] 276 elif "NotResource" in statement and isinstance( 277 statement["NotResource"], list 278 ): 279 assert statement["NotResource"] 280 281 def _validate_action_like_exist(self): 282 for statement in self._statements: 283 assert "Action" in statement or "NotAction" in statement 284 if "Action" in statement and isinstance(statement["Action"], list): 285 assert statement["Action"] 286 elif "NotAction" in statement and isinstance(statement["NotAction"], list): 287 assert statement["NotAction"] 288 289 def _validate_actions_for_prefixes(self): 290 self._validate_action_like_for_prefixes("Action") 291 292 def _validate_not_actions_for_prefixes(self): 293 self._validate_action_like_for_prefixes("NotAction") 294 295 def _validate_action_like_for_prefixes(self, key): 296 for statement in self._statements: 297 if key in statement: 298 if isinstance(statement[key], str): 299 self._validate_action_prefix(statement[key]) 300 else: 301 for action in statement[key]: 302 self._validate_action_prefix(action) 303 304 @staticmethod 305 def _validate_action_prefix(action): 306 action_parts = action.split(":") 307 if len(action_parts) == 1 and action_parts[0] != "*": 308 raise MalformedPolicyDocument( 309 "Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc." 310 ) 311 elif len(action_parts) > 2: 312 raise MalformedPolicyDocument( 313 "Actions/Condition can contain only one colon." 314 ) 315 316 vendor_pattern = re.compile(r"[^a-zA-Z0-9\-.]") 317 if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]): 318 raise MalformedPolicyDocument( 319 "Vendor {vendor} is not valid".format(vendor=action_parts[0]) 320 ) 321 322 def _validate_resources_for_formats(self): 323 self._validate_resource_like_for_formats("Resource") 324 325 def _validate_not_resources_for_formats(self): 326 self._validate_resource_like_for_formats("NotResource") 327 328 def _validate_resource_like_for_formats(self, key): 329 for statement in self._statements: 330 if key in statement: 331 if isinstance(statement[key], str): 332 self._validate_resource_format(statement[key]) 333 else: 334 for resource in sorted(statement[key], reverse=True): 335 self._validate_resource_format(resource) 336 if self._resource_error == "": 337 IAMPolicyDocumentValidator._legacy_parse_resource_like( 338 statement, key 339 ) 340 341 def _validate_resource_format(self, resource): 342 if resource != "*": 343 resource_partitions = resource.partition(":") 344 345 if resource_partitions[1] == "": 346 self._resource_error = 'Resource {resource} must be in ARN format or "*".'.format( 347 resource=resource 348 ) 349 return 350 351 resource_partitions = resource_partitions[2].partition(":") 352 if resource_partitions[0] not in ["aws", "*"]: 353 remaining_resource_parts = resource_partitions[2].split(":") 354 355 arn1 = ( 356 remaining_resource_parts[0] 357 if remaining_resource_parts[0] != "" 358 or len(remaining_resource_parts) > 1 359 else "*" 360 ) 361 arn2 = ( 362 remaining_resource_parts[1] 363 if len(remaining_resource_parts) > 1 364 else "*" 365 ) 366 arn3 = ( 367 remaining_resource_parts[2] 368 if len(remaining_resource_parts) > 2 369 else "*" 370 ) 371 arn4 = ( 372 ":".join(remaining_resource_parts[3:]) 373 if len(remaining_resource_parts) > 3 374 else "*" 375 ) 376 self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format( 377 partition=resource_partitions[0], 378 arn1=arn1, 379 arn2=arn2, 380 arn3=arn3, 381 arn4=arn4, 382 ) 383 return 384 385 if resource_partitions[1] != ":": 386 self._resource_error = "Resource vendor must be fully qualified and cannot contain regexes." 387 return 388 389 resource_partitions = resource_partitions[2].partition(":") 390 391 service = resource_partitions[0] 392 393 if service in SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS.keys() and not resource_partitions[ 394 2 395 ].startswith( 396 ":" 397 ): 398 self._resource_error = SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS[ 399 service 400 ].format( 401 resource=resource 402 ) 403 return 404 405 resource_partitions = resource_partitions[2].partition(":") 406 resource_partitions = resource_partitions[2].partition(":") 407 408 if service in VALID_RESOURCE_PATH_STARTING_VALUES.keys(): 409 valid_start = False 410 for valid_starting_value in VALID_RESOURCE_PATH_STARTING_VALUES[ 411 service 412 ]["values"]: 413 if resource_partitions[2].startswith(valid_starting_value): 414 valid_start = True 415 break 416 if not valid_start: 417 self._resource_error = VALID_RESOURCE_PATH_STARTING_VALUES[service][ 418 "error_message" 419 ].format( 420 values=", ".join( 421 VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"] 422 ) 423 ) 424 425 def _perform_first_legacy_parsing(self): 426 """This method excludes legacy parsing resources, since that have to be done later.""" 427 for statement in self._statements: 428 self._legacy_parse_statement(statement) 429 430 @staticmethod 431 def _legacy_parse_statement(statement): 432 assert statement["Effect"] in VALID_EFFECTS # case-sensitive matching 433 if "Condition" in statement: 434 for condition_key, condition_value in statement["Condition"].items(): 435 IAMPolicyDocumentValidator._legacy_parse_condition( 436 condition_key, condition_value 437 ) 438 439 @staticmethod 440 def _legacy_parse_resource_like(statement, key): 441 if isinstance(statement[key], str): 442 if statement[key] != "*": 443 assert statement[key].count(":") >= 5 or "::" not in statement[key] 444 assert statement[key].split(":")[2] != "" 445 else: # list 446 for resource in statement[key]: 447 if resource != "*": 448 assert resource.count(":") >= 5 or "::" not in resource 449 assert resource[2] != "" 450 451 @staticmethod 452 def _legacy_parse_condition(condition_key, condition_value): 453 stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key( 454 condition_key 455 ) 456 457 if stripped_condition_key.startswith("Date"): 458 for ( 459 condition_element_key, 460 condition_element_value, 461 ) in condition_value.items(): 462 if isinstance(condition_element_value, str): 463 IAMPolicyDocumentValidator._legacy_parse_date_condition_value( 464 condition_element_value 465 ) 466 else: # it has to be a list 467 for date_condition_value in condition_element_value: 468 IAMPolicyDocumentValidator._legacy_parse_date_condition_value( 469 date_condition_value 470 ) 471 472 @staticmethod 473 def _legacy_parse_date_condition_value(date_condition_value): 474 if "t" in date_condition_value.lower() or "-" in date_condition_value: 475 IAMPolicyDocumentValidator._validate_iso_8601_datetime( 476 date_condition_value.lower() 477 ) 478 else: # timestamp 479 assert 0 <= int(date_condition_value) <= 9223372036854775807 480 481 @staticmethod 482 def _validate_iso_8601_datetime(datetime): 483 datetime_parts = datetime.partition("t") 484 negative_year = datetime_parts[0].startswith("-") 485 date_parts = ( 486 datetime_parts[0][1:].split("-") 487 if negative_year 488 else datetime_parts[0].split("-") 489 ) 490 year = "-" + date_parts[0] if negative_year else date_parts[0] 491 assert -292275054 <= int(year) <= 292278993 492 if len(date_parts) > 1: 493 month = date_parts[1] 494 assert 1 <= int(month) <= 12 495 if len(date_parts) > 2: 496 day = date_parts[2] 497 assert 1 <= int(day) <= 31 498 assert len(date_parts) < 4 499 500 time_parts = datetime_parts[2].split(":") 501 if time_parts[0] != "": 502 hours = time_parts[0] 503 assert 0 <= int(hours) <= 23 504 if len(time_parts) > 1: 505 minutes = time_parts[1] 506 assert 0 <= int(minutes) <= 59 507 if len(time_parts) > 2: 508 if "z" in time_parts[2]: 509 seconds_with_decimal_fraction = time_parts[2].partition("z")[0] 510 assert time_parts[2].partition("z")[2] == "" 511 elif "+" in time_parts[2]: 512 seconds_with_decimal_fraction = time_parts[2].partition("+")[0] 513 time_zone_data = time_parts[2].partition("+")[2].partition(":") 514 time_zone_hours = time_zone_data[0] 515 assert len(time_zone_hours) == 2 516 assert 0 <= int(time_zone_hours) <= 23 517 if time_zone_data[1] == ":": 518 time_zone_minutes = time_zone_data[2] 519 assert len(time_zone_minutes) == 2 520 assert 0 <= int(time_zone_minutes) <= 59 521 else: 522 seconds_with_decimal_fraction = time_parts[2] 523 seconds_with_decimal_fraction_partition = seconds_with_decimal_fraction.partition( 524 "." 525 ) 526 seconds = seconds_with_decimal_fraction_partition[0] 527 assert 0 <= int(seconds) <= 59 528 if seconds_with_decimal_fraction_partition[1] == ".": 529 decimal_seconds = seconds_with_decimal_fraction_partition[2] 530 assert 0 <= int(decimal_seconds) <= 999999999 531