1# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/ 2# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22# 23import time 24from binascii import crc32 25 26import boto 27from boto.connection import AWSAuthConnection 28from boto.exception import DynamoDBResponseError 29from boto.provider import Provider 30from boto.dynamodb import exceptions as dynamodb_exceptions 31from boto.compat import json 32 33 34class Layer1(AWSAuthConnection): 35 """ 36 This is the lowest-level interface to DynamoDB. Methods at this 37 layer map directly to API requests and parameters to the methods 38 are either simple, scalar values or they are the Python equivalent 39 of the JSON input as defined in the DynamoDB Developer's Guide. 40 All responses are direct decoding of the JSON response bodies to 41 Python data structures via the json or simplejson modules. 42 43 :ivar throughput_exceeded_events: An integer variable that 44 keeps a running total of the number of ThroughputExceeded 45 responses this connection has received from Amazon DynamoDB. 46 """ 47 48 DefaultRegionName = 'us-east-1' 49 """The default region name for DynamoDB API.""" 50 51 ServiceName = 'DynamoDB' 52 """The name of the Service""" 53 54 Version = '20111205' 55 """DynamoDB API version.""" 56 57 ThruputError = "ProvisionedThroughputExceededException" 58 """The error response returned when provisioned throughput is exceeded""" 59 60 SessionExpiredError = 'com.amazon.coral.service#ExpiredTokenException' 61 """The error response returned when session token has expired""" 62 63 ConditionalCheckFailedError = 'ConditionalCheckFailedException' 64 """The error response returned when a conditional check fails""" 65 66 ValidationError = 'ValidationException' 67 """The error response returned when an item is invalid in some way""" 68 69 ResponseError = DynamoDBResponseError 70 71 NumberRetries = 10 72 """The number of times an error is retried.""" 73 74 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, 75 is_secure=True, port=None, proxy=None, proxy_port=None, 76 debug=0, security_token=None, region=None, 77 validate_certs=True, validate_checksums=True, profile_name=None): 78 if not region: 79 region_name = boto.config.get('DynamoDB', 'region', 80 self.DefaultRegionName) 81 for reg in boto.dynamodb.regions(): 82 if reg.name == region_name: 83 region = reg 84 break 85 86 self.region = region 87 super(Layer1, self).__init__(self.region.endpoint, 88 aws_access_key_id, 89 aws_secret_access_key, 90 is_secure, port, proxy, proxy_port, 91 debug=debug, security_token=security_token, 92 validate_certs=validate_certs, 93 profile_name=profile_name) 94 self.throughput_exceeded_events = 0 95 self._validate_checksums = boto.config.getbool( 96 'DynamoDB', 'validate_checksums', validate_checksums) 97 98 def _get_session_token(self): 99 self.provider = Provider(self._provider_type) 100 self._auth_handler.update_provider(self.provider) 101 102 def _required_auth_capability(self): 103 return ['hmac-v4'] 104 105 def make_request(self, action, body='', object_hook=None): 106 """ 107 :raises: ``DynamoDBExpiredTokenError`` if the security token expires. 108 """ 109 headers = {'X-Amz-Target': '%s_%s.%s' % (self.ServiceName, 110 self.Version, action), 111 'Host': self.region.endpoint, 112 'Content-Type': 'application/x-amz-json-1.0', 113 'Content-Length': str(len(body))} 114 http_request = self.build_base_http_request('POST', '/', '/', 115 {}, headers, body, None) 116 start = time.time() 117 response = self._mexe(http_request, sender=None, 118 override_num_retries=self.NumberRetries, 119 retry_handler=self._retry_handler) 120 elapsed = (time.time() - start) * 1000 121 request_id = response.getheader('x-amzn-RequestId') 122 boto.log.debug('RequestId: %s' % request_id) 123 boto.perflog.debug('%s: id=%s time=%sms', 124 headers['X-Amz-Target'], request_id, int(elapsed)) 125 response_body = response.read().decode('utf-8') 126 boto.log.debug(response_body) 127 return json.loads(response_body, object_hook=object_hook) 128 129 def _retry_handler(self, response, i, next_sleep): 130 status = None 131 if response.status == 400: 132 response_body = response.read().decode('utf-8') 133 boto.log.debug(response_body) 134 data = json.loads(response_body) 135 if self.ThruputError in data.get('__type'): 136 self.throughput_exceeded_events += 1 137 msg = "%s, retry attempt %s" % (self.ThruputError, i) 138 next_sleep = self._exponential_time(i) 139 i += 1 140 status = (msg, i, next_sleep) 141 if i == self.NumberRetries: 142 # If this was our last retry attempt, raise 143 # a specific error saying that the throughput 144 # was exceeded. 145 raise dynamodb_exceptions.DynamoDBThroughputExceededError( 146 response.status, response.reason, data) 147 elif self.SessionExpiredError in data.get('__type'): 148 msg = 'Renewing Session Token' 149 self._get_session_token() 150 status = (msg, i + self.num_retries - 1, 0) 151 elif self.ConditionalCheckFailedError in data.get('__type'): 152 raise dynamodb_exceptions.DynamoDBConditionalCheckFailedError( 153 response.status, response.reason, data) 154 elif self.ValidationError in data.get('__type'): 155 raise dynamodb_exceptions.DynamoDBValidationError( 156 response.status, response.reason, data) 157 else: 158 raise self.ResponseError(response.status, response.reason, 159 data) 160 expected_crc32 = response.getheader('x-amz-crc32') 161 if self._validate_checksums and expected_crc32 is not None: 162 boto.log.debug('Validating crc32 checksum for body: %s', 163 response.read().decode('utf-8')) 164 actual_crc32 = crc32(response.read()) & 0xffffffff 165 expected_crc32 = int(expected_crc32) 166 if actual_crc32 != expected_crc32: 167 msg = ("The calculated checksum %s did not match the expected " 168 "checksum %s" % (actual_crc32, expected_crc32)) 169 status = (msg, i + 1, self._exponential_time(i)) 170 return status 171 172 def _exponential_time(self, i): 173 if i == 0: 174 next_sleep = 0 175 else: 176 next_sleep = min(0.05 * (2 ** i), 177 boto.config.get('Boto', 'max_retry_delay', 60)) 178 return next_sleep 179 180 def list_tables(self, limit=None, start_table=None): 181 """ 182 Returns a dictionary of results. The dictionary contains 183 a **TableNames** key whose value is a list of the table names. 184 The dictionary could also contain a **LastEvaluatedTableName** 185 key whose value would be the last table name returned if 186 the complete list of table names was not returned. This 187 value would then be passed as the ``start_table`` parameter on 188 a subsequent call to this method. 189 190 :type limit: int 191 :param limit: The maximum number of tables to return. 192 193 :type start_table: str 194 :param start_table: The name of the table that starts the 195 list. If you ran a previous list_tables and not 196 all results were returned, the response dict would 197 include a LastEvaluatedTableName attribute. Use 198 that value here to continue the listing. 199 """ 200 data = {} 201 if limit: 202 data['Limit'] = limit 203 if start_table: 204 data['ExclusiveStartTableName'] = start_table 205 json_input = json.dumps(data) 206 return self.make_request('ListTables', json_input) 207 208 def describe_table(self, table_name): 209 """ 210 Returns information about the table including current 211 state of the table, primary key schema and when the 212 table was created. 213 214 :type table_name: str 215 :param table_name: The name of the table to describe. 216 """ 217 data = {'TableName': table_name} 218 json_input = json.dumps(data) 219 return self.make_request('DescribeTable', json_input) 220 221 def create_table(self, table_name, schema, provisioned_throughput): 222 """ 223 Add a new table to your account. The table name must be unique 224 among those associated with the account issuing the request. 225 This request triggers an asynchronous workflow to begin creating 226 the table. When the workflow is complete, the state of the 227 table will be ACTIVE. 228 229 :type table_name: str 230 :param table_name: The name of the table to create. 231 232 :type schema: dict 233 :param schema: A Python version of the KeySchema data structure 234 as defined by DynamoDB 235 236 :type provisioned_throughput: dict 237 :param provisioned_throughput: A Python version of the 238 ProvisionedThroughput data structure defined by 239 DynamoDB. 240 """ 241 data = {'TableName': table_name, 242 'KeySchema': schema, 243 'ProvisionedThroughput': provisioned_throughput} 244 json_input = json.dumps(data) 245 response_dict = self.make_request('CreateTable', json_input) 246 return response_dict 247 248 def update_table(self, table_name, provisioned_throughput): 249 """ 250 Updates the provisioned throughput for a given table. 251 252 :type table_name: str 253 :param table_name: The name of the table to update. 254 255 :type provisioned_throughput: dict 256 :param provisioned_throughput: A Python version of the 257 ProvisionedThroughput data structure defined by 258 DynamoDB. 259 """ 260 data = {'TableName': table_name, 261 'ProvisionedThroughput': provisioned_throughput} 262 json_input = json.dumps(data) 263 return self.make_request('UpdateTable', json_input) 264 265 def delete_table(self, table_name): 266 """ 267 Deletes the table and all of it's data. After this request 268 the table will be in the DELETING state until DynamoDB 269 completes the delete operation. 270 271 :type table_name: str 272 :param table_name: The name of the table to delete. 273 """ 274 data = {'TableName': table_name} 275 json_input = json.dumps(data) 276 return self.make_request('DeleteTable', json_input) 277 278 def get_item(self, table_name, key, attributes_to_get=None, 279 consistent_read=False, object_hook=None): 280 """ 281 Return a set of attributes for an item that matches 282 the supplied key. 283 284 :type table_name: str 285 :param table_name: The name of the table containing the item. 286 287 :type key: dict 288 :param key: A Python version of the Key data structure 289 defined by DynamoDB. 290 291 :type attributes_to_get: list 292 :param attributes_to_get: A list of attribute names. 293 If supplied, only the specified attribute names will 294 be returned. Otherwise, all attributes will be returned. 295 296 :type consistent_read: bool 297 :param consistent_read: If True, a consistent read 298 request is issued. Otherwise, an eventually consistent 299 request is issued. 300 """ 301 data = {'TableName': table_name, 302 'Key': key} 303 if attributes_to_get: 304 data['AttributesToGet'] = attributes_to_get 305 if consistent_read: 306 data['ConsistentRead'] = True 307 json_input = json.dumps(data) 308 response = self.make_request('GetItem', json_input, 309 object_hook=object_hook) 310 if 'Item' not in response: 311 raise dynamodb_exceptions.DynamoDBKeyNotFoundError( 312 "Key does not exist." 313 ) 314 return response 315 316 def batch_get_item(self, request_items, object_hook=None): 317 """ 318 Return a set of attributes for a multiple items in 319 multiple tables using their primary keys. 320 321 :type request_items: dict 322 :param request_items: A Python version of the RequestItems 323 data structure defined by DynamoDB. 324 """ 325 # If the list is empty, return empty response 326 if not request_items: 327 return {} 328 data = {'RequestItems': request_items} 329 json_input = json.dumps(data) 330 return self.make_request('BatchGetItem', json_input, 331 object_hook=object_hook) 332 333 def batch_write_item(self, request_items, object_hook=None): 334 """ 335 This operation enables you to put or delete several items 336 across multiple tables in a single API call. 337 338 :type request_items: dict 339 :param request_items: A Python version of the RequestItems 340 data structure defined by DynamoDB. 341 """ 342 data = {'RequestItems': request_items} 343 json_input = json.dumps(data) 344 return self.make_request('BatchWriteItem', json_input, 345 object_hook=object_hook) 346 347 def put_item(self, table_name, item, 348 expected=None, return_values=None, 349 object_hook=None): 350 """ 351 Create a new item or replace an old item with a new 352 item (including all attributes). If an item already 353 exists in the specified table with the same primary 354 key, the new item will completely replace the old item. 355 You can perform a conditional put by specifying an 356 expected rule. 357 358 :type table_name: str 359 :param table_name: The name of the table in which to put the item. 360 361 :type item: dict 362 :param item: A Python version of the Item data structure 363 defined by DynamoDB. 364 365 :type expected: dict 366 :param expected: A Python version of the Expected 367 data structure defined by DynamoDB. 368 369 :type return_values: str 370 :param return_values: Controls the return of attribute 371 name-value pairs before then were changed. Possible 372 values are: None or 'ALL_OLD'. If 'ALL_OLD' is 373 specified and the item is overwritten, the content 374 of the old item is returned. 375 """ 376 data = {'TableName': table_name, 377 'Item': item} 378 if expected: 379 data['Expected'] = expected 380 if return_values: 381 data['ReturnValues'] = return_values 382 json_input = json.dumps(data) 383 return self.make_request('PutItem', json_input, 384 object_hook=object_hook) 385 386 def update_item(self, table_name, key, attribute_updates, 387 expected=None, return_values=None, 388 object_hook=None): 389 """ 390 Edits an existing item's attributes. You can perform a conditional 391 update (insert a new attribute name-value pair if it doesn't exist, 392 or replace an existing name-value pair if it has certain expected 393 attribute values). 394 395 :type table_name: str 396 :param table_name: The name of the table. 397 398 :type key: dict 399 :param key: A Python version of the Key data structure 400 defined by DynamoDB which identifies the item to be updated. 401 402 :type attribute_updates: dict 403 :param attribute_updates: A Python version of the AttributeUpdates 404 data structure defined by DynamoDB. 405 406 :type expected: dict 407 :param expected: A Python version of the Expected 408 data structure defined by DynamoDB. 409 410 :type return_values: str 411 :param return_values: Controls the return of attribute 412 name-value pairs before then were changed. Possible 413 values are: None or 'ALL_OLD'. If 'ALL_OLD' is 414 specified and the item is overwritten, the content 415 of the old item is returned. 416 """ 417 data = {'TableName': table_name, 418 'Key': key, 419 'AttributeUpdates': attribute_updates} 420 if expected: 421 data['Expected'] = expected 422 if return_values: 423 data['ReturnValues'] = return_values 424 json_input = json.dumps(data) 425 return self.make_request('UpdateItem', json_input, 426 object_hook=object_hook) 427 428 def delete_item(self, table_name, key, 429 expected=None, return_values=None, 430 object_hook=None): 431 """ 432 Delete an item and all of it's attributes by primary key. 433 You can perform a conditional delete by specifying an 434 expected rule. 435 436 :type table_name: str 437 :param table_name: The name of the table containing the item. 438 439 :type key: dict 440 :param key: A Python version of the Key data structure 441 defined by DynamoDB. 442 443 :type expected: dict 444 :param expected: A Python version of the Expected 445 data structure defined by DynamoDB. 446 447 :type return_values: str 448 :param return_values: Controls the return of attribute 449 name-value pairs before then were changed. Possible 450 values are: None or 'ALL_OLD'. If 'ALL_OLD' is 451 specified and the item is overwritten, the content 452 of the old item is returned. 453 """ 454 data = {'TableName': table_name, 455 'Key': key} 456 if expected: 457 data['Expected'] = expected 458 if return_values: 459 data['ReturnValues'] = return_values 460 json_input = json.dumps(data) 461 return self.make_request('DeleteItem', json_input, 462 object_hook=object_hook) 463 464 def query(self, table_name, hash_key_value, range_key_conditions=None, 465 attributes_to_get=None, limit=None, consistent_read=False, 466 scan_index_forward=True, exclusive_start_key=None, 467 object_hook=None, count=False): 468 """ 469 Perform a query of DynamoDB. This version is currently punting 470 and expecting you to provide a full and correct JSON body 471 which is passed as is to DynamoDB. 472 473 :type table_name: str 474 :param table_name: The name of the table to query. 475 476 :type hash_key_value: dict 477 :param key: A DynamoDB-style HashKeyValue. 478 479 :type range_key_conditions: dict 480 :param range_key_conditions: A Python version of the 481 RangeKeyConditions data structure. 482 483 :type attributes_to_get: list 484 :param attributes_to_get: A list of attribute names. 485 If supplied, only the specified attribute names will 486 be returned. Otherwise, all attributes will be returned. 487 488 :type limit: int 489 :param limit: The maximum number of items to return. 490 491 :type count: bool 492 :param count: If True, Amazon DynamoDB returns a total 493 number of items for the Query operation, even if the 494 operation has no matching items for the assigned filter. 495 496 :type consistent_read: bool 497 :param consistent_read: If True, a consistent read 498 request is issued. Otherwise, an eventually consistent 499 request is issued. 500 501 :type scan_index_forward: bool 502 :param scan_index_forward: Specified forward or backward 503 traversal of the index. Default is forward (True). 504 505 :type exclusive_start_key: list or tuple 506 :param exclusive_start_key: Primary key of the item from 507 which to continue an earlier query. This would be 508 provided as the LastEvaluatedKey in that query. 509 """ 510 data = {'TableName': table_name, 511 'HashKeyValue': hash_key_value} 512 if range_key_conditions: 513 data['RangeKeyCondition'] = range_key_conditions 514 if attributes_to_get: 515 data['AttributesToGet'] = attributes_to_get 516 if limit: 517 data['Limit'] = limit 518 if count: 519 data['Count'] = True 520 if consistent_read: 521 data['ConsistentRead'] = True 522 if scan_index_forward: 523 data['ScanIndexForward'] = True 524 else: 525 data['ScanIndexForward'] = False 526 if exclusive_start_key: 527 data['ExclusiveStartKey'] = exclusive_start_key 528 json_input = json.dumps(data) 529 return self.make_request('Query', json_input, 530 object_hook=object_hook) 531 532 def scan(self, table_name, scan_filter=None, 533 attributes_to_get=None, limit=None, 534 exclusive_start_key=None, object_hook=None, count=False): 535 """ 536 Perform a scan of DynamoDB. This version is currently punting 537 and expecting you to provide a full and correct JSON body 538 which is passed as is to DynamoDB. 539 540 :type table_name: str 541 :param table_name: The name of the table to scan. 542 543 :type scan_filter: dict 544 :param scan_filter: A Python version of the 545 ScanFilter data structure. 546 547 :type attributes_to_get: list 548 :param attributes_to_get: A list of attribute names. 549 If supplied, only the specified attribute names will 550 be returned. Otherwise, all attributes will be returned. 551 552 :type limit: int 553 :param limit: The maximum number of items to evaluate. 554 555 :type count: bool 556 :param count: If True, Amazon DynamoDB returns a total 557 number of items for the Scan operation, even if the 558 operation has no matching items for the assigned filter. 559 560 :type exclusive_start_key: list or tuple 561 :param exclusive_start_key: Primary key of the item from 562 which to continue an earlier query. This would be 563 provided as the LastEvaluatedKey in that query. 564 """ 565 data = {'TableName': table_name} 566 if scan_filter: 567 data['ScanFilter'] = scan_filter 568 if attributes_to_get: 569 data['AttributesToGet'] = attributes_to_get 570 if limit: 571 data['Limit'] = limit 572 if count: 573 data['Count'] = True 574 if exclusive_start_key: 575 data['ExclusiveStartKey'] = exclusive_start_key 576 json_input = json.dumps(data) 577 return self.make_request('Scan', json_input, object_hook=object_hook) 578