1from boto3 import Session
2from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
3import random
4import string
5
6
7class Pipeline(BaseModel):
8    def __init__(
9        self,
10        region,
11        name,
12        input_bucket,
13        output_bucket,
14        role,
15        content_config,
16        thumbnail_config,
17    ):
18        a = "".join(random.choice(string.digits) for _ in range(13))
19        b = "".join(random.choice(string.ascii_lowercase) for _ in range(6))
20        self.id = "{}-{}".format(a, b)
21        self.name = name
22        self.arn = "arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
23            region, ACCOUNT_ID, self.id
24        )
25        self.status = "Active"
26        self.input_bucket = input_bucket
27        self.output_bucket = output_bucket or content_config["Bucket"]
28        self.role = role
29        self.content_config = content_config or {"Bucket": self.output_bucket}
30        if "Permissions" not in self.content_config:
31            self.content_config["Permissions"] = []
32        self.thumbnail_config = thumbnail_config or {"Bucket": self.output_bucket}
33        if "Permissions" not in self.thumbnail_config:
34            self.thumbnail_config["Permissions"] = []
35
36    def update(self, name, input_bucket, role):
37        if name:
38            self.name = name
39        if input_bucket:
40            self.input_bucket = input_bucket
41        if role:
42            self.role = role
43
44    def to_dict(self):
45        return {
46            "Id": self.id,
47            "Name": self.name,
48            "Arn": self.arn,
49            "Status": self.status,
50            "InputBucket": self.input_bucket,
51            "OutputBucket": self.output_bucket,
52            "Role": self.role,
53            "Notifications": {
54                "Progressing": "",
55                "Completed": "",
56                "Warning": "",
57                "Error": "",
58            },
59            "ContentConfig": self.content_config,
60            "ThumbnailConfig": self.thumbnail_config,
61        }
62
63
64class ElasticTranscoderBackend(BaseBackend):
65    def __init__(self, region_name=None):
66        super(ElasticTranscoderBackend, self).__init__()
67        self.region_name = region_name
68        self.pipelines = {}
69
70    def reset(self):
71        region_name = self.region_name
72        self.__dict__ = {}
73        self.__init__(region_name)
74
75    def create_pipeline(
76        self,
77        name,
78        input_bucket,
79        output_bucket,
80        role,
81        aws_kms_key_arn,
82        notifications,
83        content_config,
84        thumbnail_config,
85    ):
86        pipeline = Pipeline(
87            self.region_name,
88            name,
89            input_bucket,
90            output_bucket,
91            role,
92            content_config,
93            thumbnail_config,
94        )
95        self.pipelines[pipeline.id] = pipeline
96        warnings = []
97        return pipeline, warnings
98
99    def list_pipelines(self):
100        return [p.to_dict() for _, p in self.pipelines.items()]
101
102    def read_pipeline(self, id):
103        return self.pipelines[id]
104
105    def update_pipeline(
106        self,
107        id,
108        name,
109        input_bucket,
110        role,
111        aws_kms_key_arn,
112        notifications,
113        content_config,
114        thumbnail_config,
115    ):
116        pipeline = self.read_pipeline(id)
117        pipeline.update(name, input_bucket, role)
118        warnings = []
119        return pipeline, warnings
120
121    def delete_pipeline(self, pipeline_id):
122        self.pipelines.pop(pipeline_id)
123
124
125elastictranscoder_backends = {}
126for region in Session().get_available_regions("elastictranscoder"):
127    elastictranscoder_backends[region] = ElasticTranscoderBackend(region)
128for region in Session().get_available_regions(
129    "elastictranscoder", partition_name="aws-us-gov"
130):
131    elastictranscoder_backends[region] = ElasticTranscoderBackend(region)
132for region in Session().get_available_regions(
133    "elastictranscoder", partition_name="aws-cn"
134):
135    elastictranscoder_backends[region] = ElasticTranscoderBackend(region)
136