1from boto3 import Session 2 3from moto.core import ACCOUNT_ID, BaseBackend, BaseModel 4 5 6class TimestreamTable(BaseModel): 7 def __init__(self, region_name, table_name, db_name, retention_properties): 8 self.region_name = region_name 9 self.name = table_name 10 self.db_name = db_name 11 self.retention_properties = retention_properties 12 self.records = [] 13 14 def update(self, retention_properties): 15 self.retention_properties = retention_properties 16 17 def write_records(self, records): 18 self.records.append(records) 19 20 @property 21 def arn(self): 22 return f"arn:aws:timestream:{self.region_name}:{ACCOUNT_ID}:database/{self.db_name}/table/{self.name}" 23 24 def description(self): 25 return { 26 "Arn": self.arn, 27 "TableName": self.name, 28 "DatabaseName": self.db_name, 29 "TableStatus": "ACTIVE", 30 "RetentionProperties": self.retention_properties, 31 } 32 33 34class TimestreamDatabase(BaseModel): 35 def __init__(self, region_name, database_name, kms_key_id): 36 self.region_name = region_name 37 self.name = database_name 38 self.kms_key_id = kms_key_id 39 self.tables = dict() 40 41 def update(self, kms_key_id): 42 self.kms_key_id = kms_key_id 43 44 def create_table(self, table_name, retention_properties): 45 table = TimestreamTable( 46 region_name=self.region_name, 47 table_name=table_name, 48 db_name=self.name, 49 retention_properties=retention_properties, 50 ) 51 self.tables[table_name] = table 52 return table 53 54 def update_table(self, table_name, retention_properties): 55 table = self.tables[table_name] 56 table.update(retention_properties=retention_properties) 57 return table 58 59 def delete_table(self, table_name): 60 del self.tables[table_name] 61 62 def describe_table(self, table_name): 63 return self.tables[table_name] 64 65 def list_tables(self): 66 return self.tables.values() 67 68 @property 69 def arn(self): 70 return ( 71 f"arn:aws:timestream:{self.region_name}:{ACCOUNT_ID}:database/{self.name}" 72 ) 73 74 def description(self): 75 return { 76 "Arn": self.arn, 77 "DatabaseName": self.name, 78 "TableCount": len(self.tables.keys()), 79 "KmsKeyId": self.kms_key_id, 80 } 81 82 83class TimestreamWriteBackend(BaseBackend): 84 def __init__(self, region_name): 85 self.region_name = region_name 86 self.databases = dict() 87 88 def create_database(self, database_name, kms_key_id, tags): 89 database = TimestreamDatabase(self.region_name, database_name, kms_key_id) 90 self.databases[database_name] = database 91 return database 92 93 def delete_database(self, database_name): 94 del self.databases[database_name] 95 96 def describe_database(self, database_name): 97 return self.databases[database_name] 98 99 def list_databases(self): 100 return self.databases.values() 101 102 def update_database(self, database_name, kms_key_id): 103 database = self.databases[database_name] 104 database.update(kms_key_id=kms_key_id) 105 return database 106 107 def create_table(self, database_name, table_name, retention_properties): 108 database = self.describe_database(database_name) 109 table = database.create_table(table_name, retention_properties) 110 return table 111 112 def delete_table(self, database_name, table_name): 113 database = self.describe_database(database_name) 114 database.delete_table(table_name) 115 116 def describe_table(self, database_name, table_name): 117 database = self.describe_database(database_name) 118 table = database.describe_table(table_name) 119 return table 120 121 def list_tables(self, database_name): 122 database = self.describe_database(database_name) 123 tables = database.list_tables() 124 return tables 125 126 def update_table(self, database_name, table_name, retention_properties): 127 database = self.describe_database(database_name) 128 table = database.update_table(table_name, retention_properties) 129 return table 130 131 def write_records(self, database_name, table_name, records): 132 database = self.describe_database(database_name) 133 table = database.describe_table(table_name) 134 table.write_records(records) 135 136 def describe_endpoints(self): 137 # https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.how-it-works.html 138 # Usually, the address look like this: 139 # ingest-cell1.timestream.us-east-1.amazonaws.com 140 # Where 'cell1' can be any number, 'cell2', 'cell3', etc - whichever endpoint happens to be available for that particular account 141 # We don't implement a cellular architecture in Moto though, so let's keep it simple 142 return { 143 "Endpoints": [ 144 { 145 "Address": f"ingest.timestream.{self.region_name}.amazonaws.com", 146 "CachePeriodInMinutes": 1440, 147 } 148 ] 149 } 150 151 def reset(self): 152 region_name = self.region_name 153 self.__dict__ = {} 154 self.__init__(region_name) 155 156 157timestreamwrite_backends = {} 158for available_region in Session().get_available_regions("timestream-write"): 159 timestreamwrite_backends[available_region] = TimestreamWriteBackend( 160 available_region 161 ) 162for available_region in Session().get_available_regions( 163 "timestream-write", partition_name="aws-us-gov" 164): 165 timestreamwrite_backends[available_region] = TimestreamWriteBackend( 166 available_region 167 ) 168for available_region in Session().get_available_regions( 169 "timestream-write", partition_name="aws-cn" 170): 171 timestreamwrite_backends[available_region] = TimestreamWriteBackend( 172 available_region 173 ) 174 175if len(timestreamwrite_backends) == 0: 176 # Boto does not return any regions at the time of writing (20/10/2021) 177 # Hardcoding the known regions for now 178 # Thanks, Jeff 179 for r in ["us-east-1", "us-east-2", "us-west-2", "eu-central-1", "eu-west-1"]: 180 timestreamwrite_backends[r] = TimestreamWriteBackend(r) 181