1from boto3 import Session
2
3from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
4
5
6class TimestreamTable(BaseModel):
7    def __init__(self, region_name, table_name, db_name, retention_properties):
8        self.region_name = region_name
9        self.name = table_name
10        self.db_name = db_name
11        self.retention_properties = retention_properties
12        self.records = []
13
14    def update(self, retention_properties):
15        self.retention_properties = retention_properties
16
17    def write_records(self, records):
18        self.records.append(records)
19
20    @property
21    def arn(self):
22        return f"arn:aws:timestream:{self.region_name}:{ACCOUNT_ID}:database/{self.db_name}/table/{self.name}"
23
24    def description(self):
25        return {
26            "Arn": self.arn,
27            "TableName": self.name,
28            "DatabaseName": self.db_name,
29            "TableStatus": "ACTIVE",
30            "RetentionProperties": self.retention_properties,
31        }
32
33
34class TimestreamDatabase(BaseModel):
35    def __init__(self, region_name, database_name, kms_key_id):
36        self.region_name = region_name
37        self.name = database_name
38        self.kms_key_id = kms_key_id
39        self.tables = dict()
40
41    def update(self, kms_key_id):
42        self.kms_key_id = kms_key_id
43
44    def create_table(self, table_name, retention_properties):
45        table = TimestreamTable(
46            region_name=self.region_name,
47            table_name=table_name,
48            db_name=self.name,
49            retention_properties=retention_properties,
50        )
51        self.tables[table_name] = table
52        return table
53
54    def update_table(self, table_name, retention_properties):
55        table = self.tables[table_name]
56        table.update(retention_properties=retention_properties)
57        return table
58
59    def delete_table(self, table_name):
60        del self.tables[table_name]
61
62    def describe_table(self, table_name):
63        return self.tables[table_name]
64
65    def list_tables(self):
66        return self.tables.values()
67
68    @property
69    def arn(self):
70        return (
71            f"arn:aws:timestream:{self.region_name}:{ACCOUNT_ID}:database/{self.name}"
72        )
73
74    def description(self):
75        return {
76            "Arn": self.arn,
77            "DatabaseName": self.name,
78            "TableCount": len(self.tables.keys()),
79            "KmsKeyId": self.kms_key_id,
80        }
81
82
83class TimestreamWriteBackend(BaseBackend):
84    def __init__(self, region_name):
85        self.region_name = region_name
86        self.databases = dict()
87
88    def create_database(self, database_name, kms_key_id, tags):
89        database = TimestreamDatabase(self.region_name, database_name, kms_key_id)
90        self.databases[database_name] = database
91        return database
92
93    def delete_database(self, database_name):
94        del self.databases[database_name]
95
96    def describe_database(self, database_name):
97        return self.databases[database_name]
98
99    def list_databases(self):
100        return self.databases.values()
101
102    def update_database(self, database_name, kms_key_id):
103        database = self.databases[database_name]
104        database.update(kms_key_id=kms_key_id)
105        return database
106
107    def create_table(self, database_name, table_name, retention_properties):
108        database = self.describe_database(database_name)
109        table = database.create_table(table_name, retention_properties)
110        return table
111
112    def delete_table(self, database_name, table_name):
113        database = self.describe_database(database_name)
114        database.delete_table(table_name)
115
116    def describe_table(self, database_name, table_name):
117        database = self.describe_database(database_name)
118        table = database.describe_table(table_name)
119        return table
120
121    def list_tables(self, database_name):
122        database = self.describe_database(database_name)
123        tables = database.list_tables()
124        return tables
125
126    def update_table(self, database_name, table_name, retention_properties):
127        database = self.describe_database(database_name)
128        table = database.update_table(table_name, retention_properties)
129        return table
130
131    def write_records(self, database_name, table_name, records):
132        database = self.describe_database(database_name)
133        table = database.describe_table(table_name)
134        table.write_records(records)
135
136    def describe_endpoints(self):
137        # https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.how-it-works.html
138        # Usually, the address look like this:
139        # ingest-cell1.timestream.us-east-1.amazonaws.com
140        # Where 'cell1' can be any number, 'cell2', 'cell3', etc - whichever endpoint happens to be available for that particular account
141        # We don't implement a cellular architecture in Moto though, so let's keep it simple
142        return {
143            "Endpoints": [
144                {
145                    "Address": f"ingest.timestream.{self.region_name}.amazonaws.com",
146                    "CachePeriodInMinutes": 1440,
147                }
148            ]
149        }
150
151    def reset(self):
152        region_name = self.region_name
153        self.__dict__ = {}
154        self.__init__(region_name)
155
156
157timestreamwrite_backends = {}
158for available_region in Session().get_available_regions("timestream-write"):
159    timestreamwrite_backends[available_region] = TimestreamWriteBackend(
160        available_region
161    )
162for available_region in Session().get_available_regions(
163    "timestream-write", partition_name="aws-us-gov"
164):
165    timestreamwrite_backends[available_region] = TimestreamWriteBackend(
166        available_region
167    )
168for available_region in Session().get_available_regions(
169    "timestream-write", partition_name="aws-cn"
170):
171    timestreamwrite_backends[available_region] = TimestreamWriteBackend(
172        available_region
173    )
174
175if len(timestreamwrite_backends) == 0:
176    # Boto does not return any regions at the time of writing (20/10/2021)
177    # Hardcoding the known regions for now
178    # Thanks, Jeff
179    for r in ["us-east-1", "us-east-2", "us-west-2", "eu-central-1", "eu-west-1"]:
180        timestreamwrite_backends[r] = TimestreamWriteBackend(r)
181