1from boto3 import Session 2from moto.core import ACCOUNT_ID, BaseBackend, BaseModel 3import random 4import string 5 6 7class Pipeline(BaseModel): 8 def __init__( 9 self, 10 region, 11 name, 12 input_bucket, 13 output_bucket, 14 role, 15 content_config, 16 thumbnail_config, 17 ): 18 a = "".join(random.choice(string.digits) for _ in range(13)) 19 b = "".join(random.choice(string.ascii_lowercase) for _ in range(6)) 20 self.id = "{}-{}".format(a, b) 21 self.name = name 22 self.arn = "arn:aws:elastictranscoder:{}:{}:pipeline/{}".format( 23 region, ACCOUNT_ID, self.id 24 ) 25 self.status = "Active" 26 self.input_bucket = input_bucket 27 self.output_bucket = output_bucket or content_config["Bucket"] 28 self.role = role 29 self.content_config = content_config or {"Bucket": self.output_bucket} 30 if "Permissions" not in self.content_config: 31 self.content_config["Permissions"] = [] 32 self.thumbnail_config = thumbnail_config or {"Bucket": self.output_bucket} 33 if "Permissions" not in self.thumbnail_config: 34 self.thumbnail_config["Permissions"] = [] 35 36 def update(self, name, input_bucket, role): 37 if name: 38 self.name = name 39 if input_bucket: 40 self.input_bucket = input_bucket 41 if role: 42 self.role = role 43 44 def to_dict(self): 45 return { 46 "Id": self.id, 47 "Name": self.name, 48 "Arn": self.arn, 49 "Status": self.status, 50 "InputBucket": self.input_bucket, 51 "OutputBucket": self.output_bucket, 52 "Role": self.role, 53 "Notifications": { 54 "Progressing": "", 55 "Completed": "", 56 "Warning": "", 57 "Error": "", 58 }, 59 "ContentConfig": self.content_config, 60 "ThumbnailConfig": self.thumbnail_config, 61 } 62 63 64class ElasticTranscoderBackend(BaseBackend): 65 def __init__(self, region_name=None): 66 super(ElasticTranscoderBackend, self).__init__() 67 self.region_name = region_name 68 self.pipelines = {} 69 70 def reset(self): 71 region_name = self.region_name 72 self.__dict__ = {} 73 self.__init__(region_name) 74 75 def create_pipeline( 76 self, 77 name, 78 input_bucket, 79 output_bucket, 80 role, 81 aws_kms_key_arn, 82 notifications, 83 content_config, 84 thumbnail_config, 85 ): 86 pipeline = Pipeline( 87 self.region_name, 88 name, 89 input_bucket, 90 output_bucket, 91 role, 92 content_config, 93 thumbnail_config, 94 ) 95 self.pipelines[pipeline.id] = pipeline 96 warnings = [] 97 return pipeline, warnings 98 99 def list_pipelines(self): 100 return [p.to_dict() for _, p in self.pipelines.items()] 101 102 def read_pipeline(self, id): 103 return self.pipelines[id] 104 105 def update_pipeline( 106 self, 107 id, 108 name, 109 input_bucket, 110 role, 111 aws_kms_key_arn, 112 notifications, 113 content_config, 114 thumbnail_config, 115 ): 116 pipeline = self.read_pipeline(id) 117 pipeline.update(name, input_bucket, role) 118 warnings = [] 119 return pipeline, warnings 120 121 def delete_pipeline(self, pipeline_id): 122 self.pipelines.pop(pipeline_id) 123 124 125elastictranscoder_backends = {} 126for region in Session().get_available_regions("elastictranscoder"): 127 elastictranscoder_backends[region] = ElasticTranscoderBackend(region) 128for region in Session().get_available_regions( 129 "elastictranscoder", partition_name="aws-us-gov" 130): 131 elastictranscoder_backends[region] = ElasticTranscoderBackend(region) 132for region in Session().get_available_regions( 133 "elastictranscoder", partition_name="aws-cn" 134): 135 elastictranscoder_backends[region] = ElasticTranscoderBackend(region) 136