1#!/usr/bin/env python
2
3import argparse
4import os
5
6import boto
7from boto.compat import json
8from boto.compat import six
9from boto.dynamodb.schema import Schema
10
11
12DESCRIPTION = """Load data into one or more DynamoDB tables.
13
14For each table, data is read from two files:
15  - {table_name}.metadata for the table's name, schema and provisioned
16    throughput (only required if creating the table).
17  - {table_name}.data for the table's actual contents.
18
19Both files are searched for in the current directory. To read them from
20somewhere else, use the --in-dir parameter.
21
22This program does not wipe the tables prior to loading data. However, any
23items present in the data files will overwrite the table's contents.
24"""
25
26
27def _json_iterload(fd):
28    """Lazily load newline-separated JSON objects from a file-like object."""
29    buffer = ""
30    eof = False
31    while not eof:
32        try:
33            # Add a line to the buffer
34            buffer += fd.next()
35        except StopIteration:
36            # We can't let that exception bubble up, otherwise the last
37            # object in the file will never be decoded.
38            eof = True
39        try:
40            # Try to decode a JSON object.
41            json_object = json.loads(buffer.strip())
42
43            # Success: clear the buffer (everything was decoded).
44            buffer = ""
45        except ValueError:
46            if eof and buffer.strip():
47                # No more lines to load and the buffer contains something other
48                # than whitespace: the file is, in fact, malformed.
49                raise
50            # We couldn't decode a complete JSON object: load more lines.
51            continue
52
53        yield json_object
54
55
56def create_table(metadata_fd):
57    """Create a table from a metadata file-like object."""
58
59
60def load_table(table, in_fd):
61    """Load items into a table from a file-like object."""
62    for i in _json_iterload(in_fd):
63        # Convert lists back to sets.
64        data = {}
65        for k, v in six.iteritems(i):
66            if isinstance(v, list):
67                data[k] = set(v)
68            else:
69                data[k] = v
70        table.new_item(attrs=data).put()
71
72
73def dynamodb_load(tables, in_dir, create_tables):
74    conn = boto.connect_dynamodb()
75    for t in tables:
76        metadata_file = os.path.join(in_dir, "%s.metadata" % t)
77        data_file = os.path.join(in_dir, "%s.data" % t)
78        if create_tables:
79            with open(metadata_file) as meta_fd:
80                metadata = json.load(meta_fd)
81            table = conn.create_table(
82                name=t,
83                schema=Schema(metadata["schema"]),
84                read_units=metadata["read_units"],
85                write_units=metadata["write_units"],
86            )
87            table.refresh(wait_for_active=True)
88        else:
89            table = conn.get_table(t)
90
91        with open(data_file) as in_fd:
92            load_table(table, in_fd)
93
94
95if __name__ == "__main__":
96    parser = argparse.ArgumentParser(
97        prog="dynamodb_load",
98        description=DESCRIPTION
99    )
100    parser.add_argument(
101        "--create-tables",
102        action="store_true",
103        help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)."
104    )
105    parser.add_argument("--in-dir", default=".")
106    parser.add_argument("tables", metavar="TABLES", nargs="+")
107
108    namespace = parser.parse_args()
109
110    dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)
111