1import json
2from datetime import datetime
3
4from boto3 import Session
5from moto.core.utils import iso_8601_datetime_with_milliseconds
6
7from moto.iam.exceptions import IAMNotFoundException
8
9from moto.iam import iam_backends
10
11from moto.codepipeline.exceptions import (
12    InvalidStructureException,
13    PipelineNotFoundException,
14    ResourceNotFoundException,
15    InvalidTagsException,
16    TooManyTagsException,
17)
18from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
19
20
21class CodePipeline(BaseModel):
22    def __init__(self, region, pipeline):
23        # the version number for a new pipeline is always 1
24        pipeline["version"] = 1
25
26        self.pipeline = self.add_default_values(pipeline)
27        self.tags = {}
28
29        self._arn = "arn:aws:codepipeline:{0}:{1}:{2}".format(
30            region, ACCOUNT_ID, pipeline["name"]
31        )
32        self._created = datetime.utcnow()
33        self._updated = datetime.utcnow()
34
35    @property
36    def metadata(self):
37        return {
38            "pipelineArn": self._arn,
39            "created": iso_8601_datetime_with_milliseconds(self._created),
40            "updated": iso_8601_datetime_with_milliseconds(self._updated),
41        }
42
43    def add_default_values(self, pipeline):
44        for stage in pipeline["stages"]:
45            for action in stage["actions"]:
46                if "runOrder" not in action:
47                    action["runOrder"] = 1
48                if "configuration" not in action:
49                    action["configuration"] = {}
50                if "outputArtifacts" not in action:
51                    action["outputArtifacts"] = []
52                if "inputArtifacts" not in action:
53                    action["inputArtifacts"] = []
54
55        return pipeline
56
57    def validate_tags(self, tags):
58        for tag in tags:
59            if tag["key"].startswith("aws:"):
60                raise InvalidTagsException(
61                    "Not allowed to modify system tags. "
62                    "System tags start with 'aws:'. "
63                    "msg=[Caller is an end user and not allowed to mutate system tags]"
64                )
65
66        if (len(self.tags) + len(tags)) > 50:
67            raise TooManyTagsException(self._arn)
68
69
70class CodePipelineBackend(BaseBackend):
71    def __init__(self):
72        self.pipelines = {}
73
74    @staticmethod
75    def default_vpc_endpoint_service(service_region, zones):
76        """Default VPC endpoint service."""
77        return BaseBackend.default_vpc_endpoint_service_factory(
78            service_region, zones, "codepipeline", policy_supported=False
79        )
80
81    @property
82    def iam_backend(self):
83        return iam_backends["global"]
84
85    def create_pipeline(self, region, pipeline, tags):
86        if pipeline["name"] in self.pipelines:
87            raise InvalidStructureException(
88                "A pipeline with the name '{0}' already exists in account '{1}'".format(
89                    pipeline["name"], ACCOUNT_ID
90                )
91            )
92
93        try:
94            role = self.iam_backend.get_role_by_arn(pipeline["roleArn"])
95            service_principal = json.loads(role.assume_role_policy_document)[
96                "Statement"
97            ][0]["Principal"]["Service"]
98            if "codepipeline.amazonaws.com" not in service_principal:
99                raise IAMNotFoundException("")
100        except IAMNotFoundException:
101            raise InvalidStructureException(
102                "CodePipeline is not authorized to perform AssumeRole on role {}".format(
103                    pipeline["roleArn"]
104                )
105            )
106
107        if len(pipeline["stages"]) < 2:
108            raise InvalidStructureException(
109                "Pipeline has only 1 stage(s). There should be a minimum of 2 stages in a pipeline"
110            )
111
112        self.pipelines[pipeline["name"]] = CodePipeline(region, pipeline)
113
114        if tags:
115            self.pipelines[pipeline["name"]].validate_tags(tags)
116
117            new_tags = {tag["key"]: tag["value"] for tag in tags}
118            self.pipelines[pipeline["name"]].tags.update(new_tags)
119
120        return pipeline, sorted(tags, key=lambda i: i["key"])
121
122    def get_pipeline(self, name):
123        codepipeline = self.pipelines.get(name)
124
125        if not codepipeline:
126            raise PipelineNotFoundException(
127                "Account '{0}' does not have a pipeline with name '{1}'".format(
128                    ACCOUNT_ID, name
129                )
130            )
131
132        return codepipeline.pipeline, codepipeline.metadata
133
134    def update_pipeline(self, pipeline):
135        codepipeline = self.pipelines.get(pipeline["name"])
136
137        if not codepipeline:
138            raise ResourceNotFoundException(
139                "The account with id '{0}' does not include a pipeline with the name '{1}'".format(
140                    ACCOUNT_ID, pipeline["name"]
141                )
142            )
143
144        # version number is auto incremented
145        pipeline["version"] = codepipeline.pipeline["version"] + 1
146        codepipeline._updated = datetime.utcnow()
147        codepipeline.pipeline = codepipeline.add_default_values(pipeline)
148
149        return codepipeline.pipeline
150
151    def list_pipelines(self):
152        pipelines = []
153
154        for name, codepipeline in self.pipelines.items():
155            pipelines.append(
156                {
157                    "name": name,
158                    "version": codepipeline.pipeline["version"],
159                    "created": codepipeline.metadata["created"],
160                    "updated": codepipeline.metadata["updated"],
161                }
162            )
163
164        return sorted(pipelines, key=lambda i: i["name"])
165
166    def delete_pipeline(self, name):
167        self.pipelines.pop(name, None)
168
169    def list_tags_for_resource(self, arn):
170        name = arn.split(":")[-1]
171        pipeline = self.pipelines.get(name)
172
173        if not pipeline:
174            raise ResourceNotFoundException(
175                "The account with id '{0}' does not include a pipeline with the name '{1}'".format(
176                    ACCOUNT_ID, name
177                )
178            )
179
180        tags = [{"key": key, "value": value} for key, value in pipeline.tags.items()]
181
182        return sorted(tags, key=lambda i: i["key"])
183
184    def tag_resource(self, arn, tags):
185        name = arn.split(":")[-1]
186        pipeline = self.pipelines.get(name)
187
188        if not pipeline:
189            raise ResourceNotFoundException(
190                "The account with id '{0}' does not include a pipeline with the name '{1}'".format(
191                    ACCOUNT_ID, name
192                )
193            )
194
195        pipeline.validate_tags(tags)
196
197        for tag in tags:
198            pipeline.tags.update({tag["key"]: tag["value"]})
199
200    def untag_resource(self, arn, tag_keys):
201        name = arn.split(":")[-1]
202        pipeline = self.pipelines.get(name)
203
204        if not pipeline:
205            raise ResourceNotFoundException(
206                "The account with id '{0}' does not include a pipeline with the name '{1}'".format(
207                    ACCOUNT_ID, name
208                )
209            )
210
211        for key in tag_keys:
212            pipeline.tags.pop(key, None)
213
214
215codepipeline_backends = {}
216for region in Session().get_available_regions("codepipeline"):
217    codepipeline_backends[region] = CodePipelineBackend()
218for region in Session().get_available_regions(
219    "codepipeline", partition_name="aws-us-gov"
220):
221    codepipeline_backends[region] = CodePipelineBackend()
222for region in Session().get_available_regions("codepipeline", partition_name="aws-cn"):
223    codepipeline_backends[region] = CodePipelineBackend()
224