1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigtable/examples/bigtable_examples_common.h"
16 #include "google/cloud/bigtable/table.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/internal/random.h"
19 #include "google/cloud/testing_util/crash_handler.h"
20 #include <sstream>
21
22 namespace {
23
AsyncApply(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)24 void AsyncApply(google::cloud::bigtable::Table table,
25 google::cloud::bigtable::CompletionQueue cq,
26 std::vector<std::string> const& argv) {
27 //! [async-apply]
28 namespace cbt = google::cloud::bigtable;
29 using google::cloud::future;
30 using google::cloud::StatusOr;
31 [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
32 auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
33 std::chrono::system_clock::now().time_since_epoch());
34
35 cbt::SingleRowMutation mutation(row_key);
36 mutation.emplace_back(
37 cbt::SetCell("fam", "column0", timestamp, "value for column0"));
38 mutation.emplace_back(
39 cbt::SetCell("fam", "column1", timestamp, "value for column1"));
40
41 future<google::cloud::Status> status_future =
42 table.AsyncApply(std::move(mutation), cq);
43 auto status = status_future.get();
44 if (!status.ok()) throw std::runtime_error(status.message());
45 std::cout << "Successfully applied mutation\n";
46 }
47 //! [async-apply]
48 (std::move(table), std::move(cq), argv.at(0));
49 }
50
AsyncBulkApply(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)51 void AsyncBulkApply(google::cloud::bigtable::Table table,
52 google::cloud::bigtable::CompletionQueue cq,
53 std::vector<std::string> const&) {
54 //! [bulk async-bulk-apply]
55 namespace cbt = google::cloud::bigtable;
56 using google::cloud::future;
57 [](cbt::Table table, cbt::CompletionQueue cq) {
58 // Write several rows in a single operation, each row has some trivial data.
59 cbt::BulkMutation bulk;
60 for (int i = 0; i != 5000; ++i) {
61 // Note: This example uses sequential numeric IDs for simplicity, but
62 // this can result in poor performance in a production application.
63 // Since rows are stored in sorted order by key, sequential keys can
64 // result in poor distribution of operations across nodes.
65 //
66 // For more information about how to design a Bigtable schema for the
67 // best performance, see the documentation:
68 //
69 // https://cloud.google.com/bigtable/docs/schema-design
70 char buf[32];
71 snprintf(buf, sizeof(buf), "key-%06d", i);
72 cbt::SingleRowMutation mutation(buf);
73 mutation.emplace_back(
74 cbt::SetCell("fam", "col0", "value0-" + std::to_string(i)));
75 mutation.emplace_back(
76 cbt::SetCell("fam", "col1", "value2-" + std::to_string(i)));
77 mutation.emplace_back(
78 cbt::SetCell("fam", "col2", "value3-" + std::to_string(i)));
79 mutation.emplace_back(
80 cbt::SetCell("fam", "col3", "value4-" + std::to_string(i)));
81 bulk.emplace_back(std::move(mutation));
82 }
83
84 table.AsyncBulkApply(std::move(bulk), cq)
85 .then([](future<std::vector<cbt::FailedMutation>> ft) {
86 auto failures = ft.get();
87 if (failures.empty()) {
88 std::cout << "All the mutations were successful\n";
89 return;
90 }
91 // By default, the `table` object uses the
92 // `SafeIdempotentMutationPolicy` which does not retry if any of the
93 // mutations fails and are not idempotent. In this example we simply
94 // print such failures, if any, and ignore them otherwise.
95 std::cerr << "The following mutations failed and were not retried:\n";
96 for (auto const& f : failures) {
97 std::cerr << "index[" << f.original_index() << "]=" << f.status()
98 << "\n";
99 }
100 })
101 .get(); // block to simplify the example
102 }
103 //! [bulk async-bulk-apply]
104 (std::move(table), std::move(cq));
105 }
106
AsyncReadRows(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)107 void AsyncReadRows(google::cloud::bigtable::Table table,
108 google::cloud::bigtable::CompletionQueue cq,
109 std::vector<std::string> const&) {
110 //! [async read rows]
111 namespace cbt = google::cloud::bigtable;
112 using google::cloud::make_ready_future;
113 using google::cloud::promise;
114 using google::cloud::Status;
115 [](cbt::CompletionQueue cq, cbt::Table table) {
116 // Create the range of rows to read.
117 auto range = cbt::RowRange::Range("key-000010", "key-000020");
118 // Filter the results, only include values from the "col0" column in the
119 // "fam" column family, and only get the latest value.
120 auto filter = cbt::Filter::Chain(
121 cbt::Filter::ColumnRangeClosed("fam", "col0", "col0"),
122 cbt::Filter::Latest(1));
123 promise<Status> stream_status_promise;
124 // Read and print the rows.
125 table.AsyncReadRows(
126 cq,
127 [](cbt::Row const& row) {
128 if (row.cells().size() != 1) {
129 std::cout << "Unexpected number of cells in " << row.row_key()
130 << "\n";
131 return make_ready_future(false);
132 }
133 auto const& cell = row.cells().at(0);
134 std::cout << cell.row_key() << " = [" << cell.value() << "]\n";
135 return make_ready_future(true);
136 },
137 [&stream_status_promise](Status const& stream_status) {
138 stream_status_promise.set_value(stream_status);
139 },
140 range, filter);
141 Status stream_status = stream_status_promise.get_future().get();
142 if (!stream_status.ok()) throw std::runtime_error(stream_status.message());
143 }
144 //! [async read rows]
145 (std::move(cq), std::move(table));
146 }
147
AsyncReadRowsWithLimit(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)148 void AsyncReadRowsWithLimit(google::cloud::bigtable::Table table,
149 google::cloud::bigtable::CompletionQueue cq,
150 std::vector<std::string> const&) {
151 //! [async read rows with limit]
152 namespace cbt = google::cloud::bigtable;
153 using google::cloud::make_ready_future;
154 using google::cloud::promise;
155 using google::cloud::Status;
156 [](cbt::CompletionQueue cq, cbt::Table table) {
157 // Create the range of rows to read.
158 auto range = cbt::RowRange::Range("key-000010", "key-000020");
159 // Filter the results, only include values from the "col0" column in the
160 // "fam" column family, and only get the latest value.
161 auto filter = cbt::Filter::Chain(
162 cbt::Filter::ColumnRangeClosed("fam", "col0", "col0"),
163 cbt::Filter::Latest(1));
164 promise<Status> stream_status_promise;
165 // Read and print the rows.
166 table.AsyncReadRows(
167 cq,
168 [](cbt::Row const& row) {
169 if (row.cells().size() != 1) {
170 std::cout << "Unexpected number of cells in " << row.row_key()
171 << "\n";
172 return make_ready_future(false);
173 }
174 auto const& cell = row.cells().at(0);
175 std::cout << cell.row_key() << " = [" << cell.value() << "]\n";
176 return make_ready_future(true);
177 },
178 [&stream_status_promise](Status const& stream_status) {
179 stream_status_promise.set_value(stream_status);
180 },
181 range, filter);
182 Status stream_status = stream_status_promise.get_future().get();
183 if (!stream_status.ok()) throw std::runtime_error(stream_status.message());
184 }
185 //! [async read rows with limit]
186 (std::move(cq), std::move(table));
187 }
188
AsyncReadRow(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)189 void AsyncReadRow(google::cloud::bigtable::Table table,
190 google::cloud::bigtable::CompletionQueue cq,
191 std::vector<std::string> const& argv) {
192 //! [async read row]
193 namespace cbt = google::cloud::bigtable;
194 using google::cloud::future;
195 using google::cloud::StatusOr;
196 [](cbt::CompletionQueue cq, google::cloud::bigtable::Table table,
197 std::string const& row_key) {
198 // Filter the results, only include the latest value on each cell.
199 cbt::Filter filter = cbt::Filter::Latest(1);
200 table.AsyncReadRow(cq, row_key, std::move(filter))
201 .then(
202 [row_key](future<StatusOr<std::pair<bool, cbt::Row>>> row_future) {
203 // Read a row, this returns a tuple (bool, row)
204 auto tuple = row_future.get();
205 if (!tuple) throw std::runtime_error(tuple.status().message());
206 if (!tuple->first) {
207 std::cout << "Row " << row_key << " not found\n";
208 return;
209 }
210 std::cout << "key: " << tuple->second.row_key() << "\n";
211 for (auto& cell : tuple->second.cells()) {
212 std::cout << " " << cell.family_name() << ":"
213 << cell.column_qualifier() << " = <";
214 if (cell.column_qualifier() == "counter") {
215 // This example uses "counter" to store 64-bit numbers in
216 // big-endian format, extract them as follows:
217 std::cout
218 << cell.decode_big_endian_integer<std::int64_t>().value();
219 } else {
220 std::cout << cell.value();
221 }
222 std::cout << ">\n";
223 }
224 })
225 .get(); // block to simplify the example
226 }
227 //! [async read row]
228 (std::move(cq), std::move(table), argv.at(0));
229 }
230
AsyncCheckAndMutate(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)231 void AsyncCheckAndMutate(google::cloud::bigtable::Table table,
232 google::cloud::bigtable::CompletionQueue cq,
233 std::vector<std::string> const& argv) {
234 //! [async check and mutate]
235 namespace cbt = google::cloud::bigtable;
236 using google::cloud::future;
237 using google::cloud::StatusOr;
238 [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
239 // Check if the latest value of the flip-flop column is "on".
240 cbt::Filter predicate = cbt::Filter::Chain(
241 cbt::Filter::ColumnRangeClosed("fam", "flip-flop", "flip-flop"),
242 cbt::Filter::Latest(1), cbt::Filter::ValueRegex("on"));
243 future<StatusOr<cbt::MutationBranch>> branch_future =
244 table.AsyncCheckAndMutateRow(row_key, std::move(predicate),
245 {cbt::SetCell("fam", "flip-flop", "off"),
246 cbt::SetCell("fam", "flop-flip", "on")},
247 {cbt::SetCell("fam", "flip-flop", "on"),
248 cbt::SetCell("fam", "flop-flip", "off")},
249 cq);
250
251 branch_future
252 .then([](future<StatusOr<cbt::MutationBranch>> f) {
253 auto response = f.get();
254 if (!response) throw std::runtime_error(response.status().message());
255 if (*response == cbt::MutationBranch::kPredicateMatched) {
256 std::cout << "The predicate was matched\n";
257 } else {
258 std::cout << "The predicate was not matched\n";
259 }
260 })
261 .get(); // block to simplify the example.
262 }
263 //! [async check and mutate]
264 (std::move(table), std::move(cq), argv.at(0));
265 }
266
AsyncReadModifyWrite(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)267 void AsyncReadModifyWrite(google::cloud::bigtable::Table table,
268 google::cloud::bigtable::CompletionQueue cq,
269 std::vector<std::string> const& argv) {
270 //! [async read modify write]
271 namespace cbt = google::cloud::bigtable;
272 using google::cloud::future;
273 using google::cloud::StatusOr;
274 [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
275 future<StatusOr<cbt::Row>> row_future = table.AsyncReadModifyWriteRow(
276 std::move(row_key), cq,
277 cbt::ReadModifyWriteRule::AppendValue("fam", "list", ";element"));
278
279 row_future
280 .then([](future<StatusOr<cbt::Row>> f) {
281 auto row = f.get();
282 // As the modify in this example is not idempotent, and this example
283 // does not attempt to retry if there is a failure, we simply print
284 // such failures, if any, and otherwise ignore them.
285 if (!row) {
286 std::cout << "Failed to append row: " << row.status().message()
287 << "\n";
288 return;
289 }
290 std::cout << "Successfully appended to " << row->row_key() << "\n";
291 })
292 .get(); // block to simplify example.
293 }
294 //! [async read modify write]
295 (std::move(table), std::move(cq), argv.at(0));
296 }
297
RunAll(std::vector<std::string> const & argv)298 void RunAll(std::vector<std::string> const& argv) {
299 namespace examples = ::google::cloud::bigtable::examples;
300 namespace cbt = google::cloud::bigtable;
301
302 if (!argv.empty()) throw examples::Usage{"auto"};
303 examples::CheckEnvironmentVariablesAreSet({
304 "GOOGLE_CLOUD_PROJECT",
305 "GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID",
306 });
307 auto const project_id =
308 google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value();
309 auto const instance_id = google::cloud::internal::GetEnv(
310 "GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID")
311 .value();
312
313 cbt::TableAdmin admin(
314 cbt::CreateDefaultAdminClient(project_id, cbt::ClientOptions{}),
315 instance_id);
316
317 // If a previous run of these samples crashes before cleaning up there may be
318 // old tables left over. As there are quotas on the total number of tables we
319 // remove stale tables after 48 hours.
320 examples::CleanupOldTables("data-async-", admin);
321
322 // Initialize a generator with some amount of entropy.
323 auto generator = google::cloud::internal::DefaultPRNG(std::random_device{}());
324 auto const table_id = examples::RandomTableId("data-async-", generator);
325
326 std::cout << "\nCreating table to run the examples (" << table_id << ")"
327 << std::endl;
328 auto schema = admin.CreateTable(
329 table_id,
330 cbt::TableConfig({{"fam", cbt::GcRule::MaxNumVersions(10)}}, {}));
331 if (!schema) throw std::runtime_error(schema.status().message());
332
333 google::cloud::bigtable::Table table(
334 google::cloud::bigtable::CreateDefaultDataClient(
335 admin.project(), admin.instance_id(),
336 google::cloud::bigtable::ClientOptions()),
337 table_id, cbt::AlwaysRetryMutationPolicy());
338
339 google::cloud::CompletionQueue cq;
340 std::thread th([&cq] { cq.Run(); });
341 examples::AutoShutdownCQ shutdown(cq, std::move(th));
342
343 std::cout << "\nRunning the AsyncApply() example" << std::endl;
344 AsyncApply(table, cq, {"row-0001"});
345
346 std::cout << "\nRunning the AsyncBulkApply() example" << std::endl;
347 AsyncBulkApply(table, cq, {});
348
349 std::cout << "\nRunning the AsyncReadRows() example" << std::endl;
350 AsyncReadRows(table, cq, {});
351
352 std::cout << "\nRunning the AsyncReadRows() example" << std::endl;
353 AsyncReadRowsWithLimit(table, cq, {});
354
355 std::cout << "\nRunning the AsyncReadRow() example [1]" << std::endl;
356 AsyncReadRow(table, cq, {"row-0001"});
357
358 std::cout << "\nRunning the AsyncReadRow() example [2]" << std::endl;
359 AsyncReadRow(table, cq, {"row-not-found-key"});
360
361 std::cout << "\nRunning the AsyncApply() example [2]" << std::endl;
362 AsyncApply(table, cq, {"check-and-mutate-row-key"});
363
364 std::cout << "\nRunning the AsyncCheckAndMutate() example" << std::endl;
365 AsyncCheckAndMutate(table, cq, {"check-and-mutate-row-key"});
366
367 std::cout << "\nRunning the AsyncApply() example [3]" << std::endl;
368 AsyncApply(table, cq, {"read-modify-write-row-key"});
369
370 std::cout << "\nRunning the AsyncReadModifyWrite() example" << std::endl;
371 AsyncReadModifyWrite(table, cq, {"read-modify-write-row-key"});
372
373 (void)admin.DeleteTable(table_id);
374 }
375
376 } // anonymous namespace
377
main(int argc,char * argv[])378 int main(int argc, char* argv[]) {
379 google::cloud::testing_util::InstallCrashHandler(argv[0]);
380
381 using google::cloud::bigtable::examples::MakeCommandEntry;
382 google::cloud::bigtable::examples::Example example({
383 MakeCommandEntry("async-apply", {"<row-key>"}, AsyncApply),
384 MakeCommandEntry("async-bulk-apply", {}, AsyncBulkApply),
385 MakeCommandEntry("async-read-rows", {}, AsyncReadRows),
386 MakeCommandEntry("async-read-rows-with-limit", {},
387 AsyncReadRowsWithLimit),
388 MakeCommandEntry("async-read-row", {"<row-key>"}, AsyncReadRow),
389 MakeCommandEntry("async-check-and-mutate", {"<row-key>"},
390 AsyncCheckAndMutate),
391 MakeCommandEntry("async-read-modify-write", {}, AsyncReadModifyWrite),
392 {"auto", RunAll},
393 });
394 return example.Run(argc, argv);
395 }
396