1import json 2from datetime import datetime 3 4from boto3 import Session 5from moto.core.utils import iso_8601_datetime_with_milliseconds 6 7from moto.iam.exceptions import IAMNotFoundException 8 9from moto.iam import iam_backends 10 11from moto.codepipeline.exceptions import ( 12 InvalidStructureException, 13 PipelineNotFoundException, 14 ResourceNotFoundException, 15 InvalidTagsException, 16 TooManyTagsException, 17) 18from moto.core import ACCOUNT_ID, BaseBackend, BaseModel 19 20 21class CodePipeline(BaseModel): 22 def __init__(self, region, pipeline): 23 # the version number for a new pipeline is always 1 24 pipeline["version"] = 1 25 26 self.pipeline = self.add_default_values(pipeline) 27 self.tags = {} 28 29 self._arn = "arn:aws:codepipeline:{0}:{1}:{2}".format( 30 region, ACCOUNT_ID, pipeline["name"] 31 ) 32 self._created = datetime.utcnow() 33 self._updated = datetime.utcnow() 34 35 @property 36 def metadata(self): 37 return { 38 "pipelineArn": self._arn, 39 "created": iso_8601_datetime_with_milliseconds(self._created), 40 "updated": iso_8601_datetime_with_milliseconds(self._updated), 41 } 42 43 def add_default_values(self, pipeline): 44 for stage in pipeline["stages"]: 45 for action in stage["actions"]: 46 if "runOrder" not in action: 47 action["runOrder"] = 1 48 if "configuration" not in action: 49 action["configuration"] = {} 50 if "outputArtifacts" not in action: 51 action["outputArtifacts"] = [] 52 if "inputArtifacts" not in action: 53 action["inputArtifacts"] = [] 54 55 return pipeline 56 57 def validate_tags(self, tags): 58 for tag in tags: 59 if tag["key"].startswith("aws:"): 60 raise InvalidTagsException( 61 "Not allowed to modify system tags. " 62 "System tags start with 'aws:'. " 63 "msg=[Caller is an end user and not allowed to mutate system tags]" 64 ) 65 66 if (len(self.tags) + len(tags)) > 50: 67 raise TooManyTagsException(self._arn) 68 69 70class CodePipelineBackend(BaseBackend): 71 def __init__(self): 72 self.pipelines = {} 73 74 @staticmethod 75 def default_vpc_endpoint_service(service_region, zones): 76 """Default VPC endpoint service.""" 77 return BaseBackend.default_vpc_endpoint_service_factory( 78 service_region, zones, "codepipeline", policy_supported=False 79 ) 80 81 @property 82 def iam_backend(self): 83 return iam_backends["global"] 84 85 def create_pipeline(self, region, pipeline, tags): 86 if pipeline["name"] in self.pipelines: 87 raise InvalidStructureException( 88 "A pipeline with the name '{0}' already exists in account '{1}'".format( 89 pipeline["name"], ACCOUNT_ID 90 ) 91 ) 92 93 try: 94 role = self.iam_backend.get_role_by_arn(pipeline["roleArn"]) 95 service_principal = json.loads(role.assume_role_policy_document)[ 96 "Statement" 97 ][0]["Principal"]["Service"] 98 if "codepipeline.amazonaws.com" not in service_principal: 99 raise IAMNotFoundException("") 100 except IAMNotFoundException: 101 raise InvalidStructureException( 102 "CodePipeline is not authorized to perform AssumeRole on role {}".format( 103 pipeline["roleArn"] 104 ) 105 ) 106 107 if len(pipeline["stages"]) < 2: 108 raise InvalidStructureException( 109 "Pipeline has only 1 stage(s). There should be a minimum of 2 stages in a pipeline" 110 ) 111 112 self.pipelines[pipeline["name"]] = CodePipeline(region, pipeline) 113 114 if tags: 115 self.pipelines[pipeline["name"]].validate_tags(tags) 116 117 new_tags = {tag["key"]: tag["value"] for tag in tags} 118 self.pipelines[pipeline["name"]].tags.update(new_tags) 119 120 return pipeline, sorted(tags, key=lambda i: i["key"]) 121 122 def get_pipeline(self, name): 123 codepipeline = self.pipelines.get(name) 124 125 if not codepipeline: 126 raise PipelineNotFoundException( 127 "Account '{0}' does not have a pipeline with name '{1}'".format( 128 ACCOUNT_ID, name 129 ) 130 ) 131 132 return codepipeline.pipeline, codepipeline.metadata 133 134 def update_pipeline(self, pipeline): 135 codepipeline = self.pipelines.get(pipeline["name"]) 136 137 if not codepipeline: 138 raise ResourceNotFoundException( 139 "The account with id '{0}' does not include a pipeline with the name '{1}'".format( 140 ACCOUNT_ID, pipeline["name"] 141 ) 142 ) 143 144 # version number is auto incremented 145 pipeline["version"] = codepipeline.pipeline["version"] + 1 146 codepipeline._updated = datetime.utcnow() 147 codepipeline.pipeline = codepipeline.add_default_values(pipeline) 148 149 return codepipeline.pipeline 150 151 def list_pipelines(self): 152 pipelines = [] 153 154 for name, codepipeline in self.pipelines.items(): 155 pipelines.append( 156 { 157 "name": name, 158 "version": codepipeline.pipeline["version"], 159 "created": codepipeline.metadata["created"], 160 "updated": codepipeline.metadata["updated"], 161 } 162 ) 163 164 return sorted(pipelines, key=lambda i: i["name"]) 165 166 def delete_pipeline(self, name): 167 self.pipelines.pop(name, None) 168 169 def list_tags_for_resource(self, arn): 170 name = arn.split(":")[-1] 171 pipeline = self.pipelines.get(name) 172 173 if not pipeline: 174 raise ResourceNotFoundException( 175 "The account with id '{0}' does not include a pipeline with the name '{1}'".format( 176 ACCOUNT_ID, name 177 ) 178 ) 179 180 tags = [{"key": key, "value": value} for key, value in pipeline.tags.items()] 181 182 return sorted(tags, key=lambda i: i["key"]) 183 184 def tag_resource(self, arn, tags): 185 name = arn.split(":")[-1] 186 pipeline = self.pipelines.get(name) 187 188 if not pipeline: 189 raise ResourceNotFoundException( 190 "The account with id '{0}' does not include a pipeline with the name '{1}'".format( 191 ACCOUNT_ID, name 192 ) 193 ) 194 195 pipeline.validate_tags(tags) 196 197 for tag in tags: 198 pipeline.tags.update({tag["key"]: tag["value"]}) 199 200 def untag_resource(self, arn, tag_keys): 201 name = arn.split(":")[-1] 202 pipeline = self.pipelines.get(name) 203 204 if not pipeline: 205 raise ResourceNotFoundException( 206 "The account with id '{0}' does not include a pipeline with the name '{1}'".format( 207 ACCOUNT_ID, name 208 ) 209 ) 210 211 for key in tag_keys: 212 pipeline.tags.pop(key, None) 213 214 215codepipeline_backends = {} 216for region in Session().get_available_regions("codepipeline"): 217 codepipeline_backends[region] = CodePipelineBackend() 218for region in Session().get_available_regions( 219 "codepipeline", partition_name="aws-us-gov" 220): 221 codepipeline_backends[region] = CodePipelineBackend() 222for region in Session().get_available_regions("codepipeline", partition_name="aws-cn"): 223 codepipeline_backends[region] = CodePipelineBackend() 224