1import json
2import re
3
4from moto.iam.exceptions import MalformedPolicyDocument
5
6
7VALID_TOP_ELEMENTS = ["Version", "Id", "Statement", "Conditions"]
8
9VALID_VERSIONS = ["2008-10-17", "2012-10-17"]
10
11VALID_STATEMENT_ELEMENTS = [
12    "Sid",
13    "Action",
14    "NotAction",
15    "Resource",
16    "NotResource",
17    "Effect",
18    "Principal",
19    "NotPrincipal",
20    "Condition",
21]
22
23VALID_EFFECTS = ["Allow", "Deny"]
24
25VALID_CONDITIONS = [
26    "StringEquals",
27    "StringNotEquals",
28    "StringEqualsIgnoreCase",
29    "StringNotEqualsIgnoreCase",
30    "StringLike",
31    "StringNotLike",
32    "NumericEquals",
33    "NumericNotEquals",
34    "NumericLessThan",
35    "NumericLessThanEquals",
36    "NumericGreaterThan",
37    "NumericGreaterThanEquals",
38    "DateEquals",
39    "DateNotEquals",
40    "DateLessThan",
41    "DateLessThanEquals",
42    "DateGreaterThan",
43    "DateGreaterThanEquals",
44    "Bool",
45    "BinaryEquals",
46    "IpAddress",
47    "NotIpAddress",
48    "ArnEquals",
49    "ArnLike",
50    "ArnNotEquals",
51    "ArnNotLike",
52    "Null",
53]
54
55VALID_CONDITION_PREFIXES = ["ForAnyValue:", "ForAllValues:"]
56
57VALID_CONDITION_POSTFIXES = ["IfExists"]
58
59SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = {
60    "iam": "IAM resource {resource} cannot contain region information.",
61    "s3": "Resource {resource} can not contain region information.",
62}
63
64VALID_RESOURCE_PATH_STARTING_VALUES = {
65    "iam": {
66        "values": [
67            "user/",
68            "federated-user/",
69            "role/",
70            "group/",
71            "instance-profile/",
72            "mfa/",
73            "server-certificate/",
74            "policy/",
75            "sms-mfa/",
76            "saml-provider/",
77            "oidc-provider/",
78            "report/",
79            "access-report/",
80        ],
81        "error_message": 'IAM resource path must either be "*" or start with {values}.',
82    }
83}
84
85
86class IAMPolicyDocumentValidator:
87    def __init__(self, policy_document):
88        self._policy_document = policy_document
89        self._policy_json = {}
90        self._statements = []
91        self._resource_error = ""  # the first resource error found that does not generate a legacy parsing error
92
93    def validate(self):
94        try:
95            self._validate_syntax()
96        except Exception:
97            raise MalformedPolicyDocument("Syntax errors in policy.")
98        try:
99            self._validate_version()
100        except Exception:
101            raise MalformedPolicyDocument(
102                "Policy document must be version 2012-10-17 or greater."
103            )
104        try:
105            self._perform_first_legacy_parsing()
106            self._validate_resources_for_formats()
107            self._validate_not_resources_for_formats()
108        except Exception:
109            raise MalformedPolicyDocument("The policy failed legacy parsing")
110        try:
111            self._validate_sid_uniqueness()
112        except Exception:
113            raise MalformedPolicyDocument(
114                "Statement IDs (SID) in a single policy must be unique."
115            )
116        try:
117            self._validate_action_like_exist()
118        except Exception:
119            raise MalformedPolicyDocument("Policy statement must contain actions.")
120        try:
121            self._validate_resource_exist()
122        except Exception:
123            raise MalformedPolicyDocument("Policy statement must contain resources.")
124
125        if self._resource_error != "":
126            raise MalformedPolicyDocument(self._resource_error)
127
128        self._validate_actions_for_prefixes()
129        self._validate_not_actions_for_prefixes()
130
131    def _validate_syntax(self):
132        self._policy_json = json.loads(self._policy_document)
133        assert isinstance(self._policy_json, dict)
134        self._validate_top_elements()
135        self._validate_version_syntax()
136        self._validate_id_syntax()
137        self._validate_statements_syntax()
138
139    def _validate_top_elements(self):
140        top_elements = self._policy_json.keys()
141        for element in top_elements:
142            assert element in VALID_TOP_ELEMENTS
143
144    def _validate_version_syntax(self):
145        if "Version" in self._policy_json:
146            assert self._policy_json["Version"] in VALID_VERSIONS
147
148    def _validate_version(self):
149        assert self._policy_json["Version"] == "2012-10-17"
150
151    def _validate_sid_uniqueness(self):
152        sids = []
153        for statement in self._statements:
154            if "Sid" in statement:
155                statementId = statement["Sid"]
156                if statementId:
157                    assert statementId not in sids
158                    sids.append(statementId)
159
160    def _validate_statements_syntax(self):
161        assert "Statement" in self._policy_json
162        assert isinstance(self._policy_json["Statement"], (dict, list))
163
164        if isinstance(self._policy_json["Statement"], dict):
165            self._statements.append(self._policy_json["Statement"])
166        else:
167            self._statements += self._policy_json["Statement"]
168
169        assert self._statements
170        for statement in self._statements:
171            self._validate_statement_syntax(statement)
172
173    @staticmethod
174    def _validate_statement_syntax(statement):
175        assert isinstance(statement, dict)
176        for statement_element in statement.keys():
177            assert statement_element in VALID_STATEMENT_ELEMENTS
178
179        assert "Resource" not in statement or "NotResource" not in statement
180        assert "Action" not in statement or "NotAction" not in statement
181
182        IAMPolicyDocumentValidator._validate_effect_syntax(statement)
183        IAMPolicyDocumentValidator._validate_action_syntax(statement)
184        IAMPolicyDocumentValidator._validate_not_action_syntax(statement)
185        IAMPolicyDocumentValidator._validate_resource_syntax(statement)
186        IAMPolicyDocumentValidator._validate_not_resource_syntax(statement)
187        IAMPolicyDocumentValidator._validate_condition_syntax(statement)
188        IAMPolicyDocumentValidator._validate_sid_syntax(statement)
189
190    @staticmethod
191    def _validate_effect_syntax(statement):
192        assert "Effect" in statement
193        assert isinstance(statement["Effect"], str)
194        assert statement["Effect"].lower() in [
195            allowed_effect.lower() for allowed_effect in VALID_EFFECTS
196        ]
197
198    @staticmethod
199    def _validate_action_syntax(statement):
200        IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
201            statement, "Action"
202        )
203
204    @staticmethod
205    def _validate_not_action_syntax(statement):
206        IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
207            statement, "NotAction"
208        )
209
210    @staticmethod
211    def _validate_resource_syntax(statement):
212        IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
213            statement, "Resource"
214        )
215
216    @staticmethod
217    def _validate_not_resource_syntax(statement):
218        IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
219            statement, "NotResource"
220        )
221
222    @staticmethod
223    def _validate_string_or_list_of_strings_syntax(statement, key):
224        if key in statement:
225            assert isinstance(statement[key], (str, list))
226            if isinstance(statement[key], list):
227                for resource in statement[key]:
228                    assert isinstance(resource, str)
229
230    @staticmethod
231    def _validate_condition_syntax(statement):
232        if "Condition" in statement:
233            assert isinstance(statement["Condition"], dict)
234            for condition_key, condition_value in statement["Condition"].items():
235                assert isinstance(condition_value, dict)
236                for (
237                    condition_element_key,
238                    condition_element_value,
239                ) in condition_value.items():
240                    assert isinstance(condition_element_value, (list, str))
241
242                if (
243                    IAMPolicyDocumentValidator._strip_condition_key(condition_key)
244                    not in VALID_CONDITIONS
245                ):
246                    assert not condition_value  # empty dict
247
248    @staticmethod
249    def _strip_condition_key(condition_key):
250        for valid_prefix in VALID_CONDITION_PREFIXES:
251            if condition_key.startswith(valid_prefix):
252                condition_key = condition_key[len(valid_prefix) :]
253                break  # strip only the first match
254
255        for valid_postfix in VALID_CONDITION_POSTFIXES:
256            if condition_key.endswith(valid_postfix):
257                condition_key = condition_key[: -len(valid_postfix)]
258                break  # strip only the first match
259
260        return condition_key
261
262    @staticmethod
263    def _validate_sid_syntax(statement):
264        if "Sid" in statement:
265            assert isinstance(statement["Sid"], str)
266
267    def _validate_id_syntax(self):
268        if "Id" in self._policy_json:
269            assert isinstance(self._policy_json["Id"], str)
270
271    def _validate_resource_exist(self):
272        for statement in self._statements:
273            assert "Resource" in statement or "NotResource" in statement
274            if "Resource" in statement and isinstance(statement["Resource"], list):
275                assert statement["Resource"]
276            elif "NotResource" in statement and isinstance(
277                statement["NotResource"], list
278            ):
279                assert statement["NotResource"]
280
281    def _validate_action_like_exist(self):
282        for statement in self._statements:
283            assert "Action" in statement or "NotAction" in statement
284            if "Action" in statement and isinstance(statement["Action"], list):
285                assert statement["Action"]
286            elif "NotAction" in statement and isinstance(statement["NotAction"], list):
287                assert statement["NotAction"]
288
289    def _validate_actions_for_prefixes(self):
290        self._validate_action_like_for_prefixes("Action")
291
292    def _validate_not_actions_for_prefixes(self):
293        self._validate_action_like_for_prefixes("NotAction")
294
295    def _validate_action_like_for_prefixes(self, key):
296        for statement in self._statements:
297            if key in statement:
298                if isinstance(statement[key], str):
299                    self._validate_action_prefix(statement[key])
300                else:
301                    for action in statement[key]:
302                        self._validate_action_prefix(action)
303
304    @staticmethod
305    def _validate_action_prefix(action):
306        action_parts = action.split(":")
307        if len(action_parts) == 1 and action_parts[0] != "*":
308            raise MalformedPolicyDocument(
309                "Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc."
310            )
311        elif len(action_parts) > 2:
312            raise MalformedPolicyDocument(
313                "Actions/Condition can contain only one colon."
314            )
315
316        vendor_pattern = re.compile(r"[^a-zA-Z0-9\-.]")
317        if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]):
318            raise MalformedPolicyDocument(
319                "Vendor {vendor} is not valid".format(vendor=action_parts[0])
320            )
321
322    def _validate_resources_for_formats(self):
323        self._validate_resource_like_for_formats("Resource")
324
325    def _validate_not_resources_for_formats(self):
326        self._validate_resource_like_for_formats("NotResource")
327
328    def _validate_resource_like_for_formats(self, key):
329        for statement in self._statements:
330            if key in statement:
331                if isinstance(statement[key], str):
332                    self._validate_resource_format(statement[key])
333                else:
334                    for resource in sorted(statement[key], reverse=True):
335                        self._validate_resource_format(resource)
336                if self._resource_error == "":
337                    IAMPolicyDocumentValidator._legacy_parse_resource_like(
338                        statement, key
339                    )
340
341    def _validate_resource_format(self, resource):
342        if resource != "*":
343            resource_partitions = resource.partition(":")
344
345            if resource_partitions[1] == "":
346                self._resource_error = 'Resource {resource} must be in ARN format or "*".'.format(
347                    resource=resource
348                )
349                return
350
351            resource_partitions = resource_partitions[2].partition(":")
352            if resource_partitions[0] not in ["aws", "*"]:
353                remaining_resource_parts = resource_partitions[2].split(":")
354
355                arn1 = (
356                    remaining_resource_parts[0]
357                    if remaining_resource_parts[0] != ""
358                    or len(remaining_resource_parts) > 1
359                    else "*"
360                )
361                arn2 = (
362                    remaining_resource_parts[1]
363                    if len(remaining_resource_parts) > 1
364                    else "*"
365                )
366                arn3 = (
367                    remaining_resource_parts[2]
368                    if len(remaining_resource_parts) > 2
369                    else "*"
370                )
371                arn4 = (
372                    ":".join(remaining_resource_parts[3:])
373                    if len(remaining_resource_parts) > 3
374                    else "*"
375                )
376                self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format(
377                    partition=resource_partitions[0],
378                    arn1=arn1,
379                    arn2=arn2,
380                    arn3=arn3,
381                    arn4=arn4,
382                )
383                return
384
385            if resource_partitions[1] != ":":
386                self._resource_error = "Resource vendor must be fully qualified and cannot contain regexes."
387                return
388
389            resource_partitions = resource_partitions[2].partition(":")
390
391            service = resource_partitions[0]
392
393            if service in SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS.keys() and not resource_partitions[
394                2
395            ].startswith(
396                ":"
397            ):
398                self._resource_error = SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS[
399                    service
400                ].format(
401                    resource=resource
402                )
403                return
404
405            resource_partitions = resource_partitions[2].partition(":")
406            resource_partitions = resource_partitions[2].partition(":")
407
408            if service in VALID_RESOURCE_PATH_STARTING_VALUES.keys():
409                valid_start = False
410                for valid_starting_value in VALID_RESOURCE_PATH_STARTING_VALUES[
411                    service
412                ]["values"]:
413                    if resource_partitions[2].startswith(valid_starting_value):
414                        valid_start = True
415                        break
416                if not valid_start:
417                    self._resource_error = VALID_RESOURCE_PATH_STARTING_VALUES[service][
418                        "error_message"
419                    ].format(
420                        values=", ".join(
421                            VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"]
422                        )
423                    )
424
425    def _perform_first_legacy_parsing(self):
426        """This method excludes legacy parsing resources, since that have to be done later."""
427        for statement in self._statements:
428            self._legacy_parse_statement(statement)
429
430    @staticmethod
431    def _legacy_parse_statement(statement):
432        assert statement["Effect"] in VALID_EFFECTS  # case-sensitive matching
433        if "Condition" in statement:
434            for condition_key, condition_value in statement["Condition"].items():
435                IAMPolicyDocumentValidator._legacy_parse_condition(
436                    condition_key, condition_value
437                )
438
439    @staticmethod
440    def _legacy_parse_resource_like(statement, key):
441        if isinstance(statement[key], str):
442            if statement[key] != "*":
443                assert statement[key].count(":") >= 5 or "::" not in statement[key]
444                assert statement[key].split(":")[2] != ""
445        else:  # list
446            for resource in statement[key]:
447                if resource != "*":
448                    assert resource.count(":") >= 5 or "::" not in resource
449                    assert resource[2] != ""
450
451    @staticmethod
452    def _legacy_parse_condition(condition_key, condition_value):
453        stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key(
454            condition_key
455        )
456
457        if stripped_condition_key.startswith("Date"):
458            for (
459                condition_element_key,
460                condition_element_value,
461            ) in condition_value.items():
462                if isinstance(condition_element_value, str):
463                    IAMPolicyDocumentValidator._legacy_parse_date_condition_value(
464                        condition_element_value
465                    )
466                else:  # it has to be a list
467                    for date_condition_value in condition_element_value:
468                        IAMPolicyDocumentValidator._legacy_parse_date_condition_value(
469                            date_condition_value
470                        )
471
472    @staticmethod
473    def _legacy_parse_date_condition_value(date_condition_value):
474        if "t" in date_condition_value.lower() or "-" in date_condition_value:
475            IAMPolicyDocumentValidator._validate_iso_8601_datetime(
476                date_condition_value.lower()
477            )
478        else:  # timestamp
479            assert 0 <= int(date_condition_value) <= 9223372036854775807
480
481    @staticmethod
482    def _validate_iso_8601_datetime(datetime):
483        datetime_parts = datetime.partition("t")
484        negative_year = datetime_parts[0].startswith("-")
485        date_parts = (
486            datetime_parts[0][1:].split("-")
487            if negative_year
488            else datetime_parts[0].split("-")
489        )
490        year = "-" + date_parts[0] if negative_year else date_parts[0]
491        assert -292275054 <= int(year) <= 292278993
492        if len(date_parts) > 1:
493            month = date_parts[1]
494            assert 1 <= int(month) <= 12
495        if len(date_parts) > 2:
496            day = date_parts[2]
497            assert 1 <= int(day) <= 31
498        assert len(date_parts) < 4
499
500        time_parts = datetime_parts[2].split(":")
501        if time_parts[0] != "":
502            hours = time_parts[0]
503            assert 0 <= int(hours) <= 23
504        if len(time_parts) > 1:
505            minutes = time_parts[1]
506            assert 0 <= int(minutes) <= 59
507        if len(time_parts) > 2:
508            if "z" in time_parts[2]:
509                seconds_with_decimal_fraction = time_parts[2].partition("z")[0]
510                assert time_parts[2].partition("z")[2] == ""
511            elif "+" in time_parts[2]:
512                seconds_with_decimal_fraction = time_parts[2].partition("+")[0]
513                time_zone_data = time_parts[2].partition("+")[2].partition(":")
514                time_zone_hours = time_zone_data[0]
515                assert len(time_zone_hours) == 2
516                assert 0 <= int(time_zone_hours) <= 23
517                if time_zone_data[1] == ":":
518                    time_zone_minutes = time_zone_data[2]
519                    assert len(time_zone_minutes) == 2
520                    assert 0 <= int(time_zone_minutes) <= 59
521            else:
522                seconds_with_decimal_fraction = time_parts[2]
523            seconds_with_decimal_fraction_partition = seconds_with_decimal_fraction.partition(
524                "."
525            )
526            seconds = seconds_with_decimal_fraction_partition[0]
527            assert 0 <= int(seconds) <= 59
528            if seconds_with_decimal_fraction_partition[1] == ".":
529                decimal_seconds = seconds_with_decimal_fraction_partition[2]
530                assert 0 <= int(decimal_seconds) <= 999999999
531