1#!/usr/bin/env python 2 3import argparse 4import os 5 6import boto 7from boto.compat import json 8from boto.compat import six 9from boto.dynamodb.schema import Schema 10 11 12DESCRIPTION = """Load data into one or more DynamoDB tables. 13 14For each table, data is read from two files: 15 - {table_name}.metadata for the table's name, schema and provisioned 16 throughput (only required if creating the table). 17 - {table_name}.data for the table's actual contents. 18 19Both files are searched for in the current directory. To read them from 20somewhere else, use the --in-dir parameter. 21 22This program does not wipe the tables prior to loading data. However, any 23items present in the data files will overwrite the table's contents. 24""" 25 26 27def _json_iterload(fd): 28 """Lazily load newline-separated JSON objects from a file-like object.""" 29 buffer = "" 30 eof = False 31 while not eof: 32 try: 33 # Add a line to the buffer 34 buffer += fd.next() 35 except StopIteration: 36 # We can't let that exception bubble up, otherwise the last 37 # object in the file will never be decoded. 38 eof = True 39 try: 40 # Try to decode a JSON object. 41 json_object = json.loads(buffer.strip()) 42 43 # Success: clear the buffer (everything was decoded). 44 buffer = "" 45 except ValueError: 46 if eof and buffer.strip(): 47 # No more lines to load and the buffer contains something other 48 # than whitespace: the file is, in fact, malformed. 49 raise 50 # We couldn't decode a complete JSON object: load more lines. 51 continue 52 53 yield json_object 54 55 56def create_table(metadata_fd): 57 """Create a table from a metadata file-like object.""" 58 59 60def load_table(table, in_fd): 61 """Load items into a table from a file-like object.""" 62 for i in _json_iterload(in_fd): 63 # Convert lists back to sets. 64 data = {} 65 for k, v in six.iteritems(i): 66 if isinstance(v, list): 67 data[k] = set(v) 68 else: 69 data[k] = v 70 table.new_item(attrs=data).put() 71 72 73def dynamodb_load(tables, in_dir, create_tables): 74 conn = boto.connect_dynamodb() 75 for t in tables: 76 metadata_file = os.path.join(in_dir, "%s.metadata" % t) 77 data_file = os.path.join(in_dir, "%s.data" % t) 78 if create_tables: 79 with open(metadata_file) as meta_fd: 80 metadata = json.load(meta_fd) 81 table = conn.create_table( 82 name=t, 83 schema=Schema(metadata["schema"]), 84 read_units=metadata["read_units"], 85 write_units=metadata["write_units"], 86 ) 87 table.refresh(wait_for_active=True) 88 else: 89 table = conn.get_table(t) 90 91 with open(data_file) as in_fd: 92 load_table(table, in_fd) 93 94 95if __name__ == "__main__": 96 parser = argparse.ArgumentParser( 97 prog="dynamodb_load", 98 description=DESCRIPTION 99 ) 100 parser.add_argument( 101 "--create-tables", 102 action="store_true", 103 help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)." 104 ) 105 parser.add_argument("--in-dir", default=".") 106 parser.add_argument("tables", metavar="TABLES", nargs="+") 107 108 namespace = parser.parse_args() 109 110 dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables) 111