1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/examples/bigtable_examples_common.h"
16 #include "google/cloud/bigtable/table.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/internal/random.h"
19 #include "google/cloud/testing_util/crash_handler.h"
20 #include <sstream>
21 
22 namespace {
23 
AsyncApply(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)24 void AsyncApply(google::cloud::bigtable::Table table,
25                 google::cloud::bigtable::CompletionQueue cq,
26                 std::vector<std::string> const& argv) {
27   //! [async-apply]
28   namespace cbt = google::cloud::bigtable;
29   using google::cloud::future;
30   using google::cloud::StatusOr;
31   [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
32     auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
33         std::chrono::system_clock::now().time_since_epoch());
34 
35     cbt::SingleRowMutation mutation(row_key);
36     mutation.emplace_back(
37         cbt::SetCell("fam", "column0", timestamp, "value for column0"));
38     mutation.emplace_back(
39         cbt::SetCell("fam", "column1", timestamp, "value for column1"));
40 
41     future<google::cloud::Status> status_future =
42         table.AsyncApply(std::move(mutation), cq);
43     auto status = status_future.get();
44     if (!status.ok()) throw std::runtime_error(status.message());
45     std::cout << "Successfully applied mutation\n";
46   }
47   //! [async-apply]
48   (std::move(table), std::move(cq), argv.at(0));
49 }
50 
AsyncBulkApply(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)51 void AsyncBulkApply(google::cloud::bigtable::Table table,
52                     google::cloud::bigtable::CompletionQueue cq,
53                     std::vector<std::string> const&) {
54   //! [bulk async-bulk-apply]
55   namespace cbt = google::cloud::bigtable;
56   using google::cloud::future;
57   [](cbt::Table table, cbt::CompletionQueue cq) {
58     // Write several rows in a single operation, each row has some trivial data.
59     cbt::BulkMutation bulk;
60     for (int i = 0; i != 5000; ++i) {
61       // Note: This example uses sequential numeric IDs for simplicity, but
62       // this can result in poor performance in a production application.
63       // Since rows are stored in sorted order by key, sequential keys can
64       // result in poor distribution of operations across nodes.
65       //
66       // For more information about how to design a Bigtable schema for the
67       // best performance, see the documentation:
68       //
69       //     https://cloud.google.com/bigtable/docs/schema-design
70       char buf[32];
71       snprintf(buf, sizeof(buf), "key-%06d", i);
72       cbt::SingleRowMutation mutation(buf);
73       mutation.emplace_back(
74           cbt::SetCell("fam", "col0", "value0-" + std::to_string(i)));
75       mutation.emplace_back(
76           cbt::SetCell("fam", "col1", "value2-" + std::to_string(i)));
77       mutation.emplace_back(
78           cbt::SetCell("fam", "col2", "value3-" + std::to_string(i)));
79       mutation.emplace_back(
80           cbt::SetCell("fam", "col3", "value4-" + std::to_string(i)));
81       bulk.emplace_back(std::move(mutation));
82     }
83 
84     table.AsyncBulkApply(std::move(bulk), cq)
85         .then([](future<std::vector<cbt::FailedMutation>> ft) {
86           auto failures = ft.get();
87           if (failures.empty()) {
88             std::cout << "All the mutations were successful\n";
89             return;
90           }
91           // By default, the `table` object uses the
92           // `SafeIdempotentMutationPolicy` which does not retry if any of the
93           // mutations fails and are not idempotent. In this example we simply
94           // print such failures, if any, and ignore them otherwise.
95           std::cerr << "The following mutations failed and were not retried:\n";
96           for (auto const& f : failures) {
97             std::cerr << "index[" << f.original_index() << "]=" << f.status()
98                       << "\n";
99           }
100         })
101         .get();  // block to simplify the example
102   }
103   //! [bulk async-bulk-apply]
104   (std::move(table), std::move(cq));
105 }
106 
AsyncReadRows(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)107 void AsyncReadRows(google::cloud::bigtable::Table table,
108                    google::cloud::bigtable::CompletionQueue cq,
109                    std::vector<std::string> const&) {
110   //! [async read rows]
111   namespace cbt = google::cloud::bigtable;
112   using google::cloud::make_ready_future;
113   using google::cloud::promise;
114   using google::cloud::Status;
115   [](cbt::CompletionQueue cq, cbt::Table table) {
116     // Create the range of rows to read.
117     auto range = cbt::RowRange::Range("key-000010", "key-000020");
118     // Filter the results, only include values from the "col0" column in the
119     // "fam" column family, and only get the latest value.
120     auto filter = cbt::Filter::Chain(
121         cbt::Filter::ColumnRangeClosed("fam", "col0", "col0"),
122         cbt::Filter::Latest(1));
123     promise<Status> stream_status_promise;
124     // Read and print the rows.
125     table.AsyncReadRows(
126         cq,
127         [](cbt::Row const& row) {
128           if (row.cells().size() != 1) {
129             std::cout << "Unexpected number of cells in " << row.row_key()
130                       << "\n";
131             return make_ready_future(false);
132           }
133           auto const& cell = row.cells().at(0);
134           std::cout << cell.row_key() << " = [" << cell.value() << "]\n";
135           return make_ready_future(true);
136         },
137         [&stream_status_promise](Status const& stream_status) {
138           stream_status_promise.set_value(stream_status);
139         },
140         range, filter);
141     Status stream_status = stream_status_promise.get_future().get();
142     if (!stream_status.ok()) throw std::runtime_error(stream_status.message());
143   }
144   //! [async read rows]
145   (std::move(cq), std::move(table));
146 }
147 
AsyncReadRowsWithLimit(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const &)148 void AsyncReadRowsWithLimit(google::cloud::bigtable::Table table,
149                             google::cloud::bigtable::CompletionQueue cq,
150                             std::vector<std::string> const&) {
151   //! [async read rows with limit]
152   namespace cbt = google::cloud::bigtable;
153   using google::cloud::make_ready_future;
154   using google::cloud::promise;
155   using google::cloud::Status;
156   [](cbt::CompletionQueue cq, cbt::Table table) {
157     // Create the range of rows to read.
158     auto range = cbt::RowRange::Range("key-000010", "key-000020");
159     // Filter the results, only include values from the "col0" column in the
160     // "fam" column family, and only get the latest value.
161     auto filter = cbt::Filter::Chain(
162         cbt::Filter::ColumnRangeClosed("fam", "col0", "col0"),
163         cbt::Filter::Latest(1));
164     promise<Status> stream_status_promise;
165     // Read and print the rows.
166     table.AsyncReadRows(
167         cq,
168         [](cbt::Row const& row) {
169           if (row.cells().size() != 1) {
170             std::cout << "Unexpected number of cells in " << row.row_key()
171                       << "\n";
172             return make_ready_future(false);
173           }
174           auto const& cell = row.cells().at(0);
175           std::cout << cell.row_key() << " = [" << cell.value() << "]\n";
176           return make_ready_future(true);
177         },
178         [&stream_status_promise](Status const& stream_status) {
179           stream_status_promise.set_value(stream_status);
180         },
181         range, filter);
182     Status stream_status = stream_status_promise.get_future().get();
183     if (!stream_status.ok()) throw std::runtime_error(stream_status.message());
184   }
185   //! [async read rows with limit]
186   (std::move(cq), std::move(table));
187 }
188 
AsyncReadRow(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)189 void AsyncReadRow(google::cloud::bigtable::Table table,
190                   google::cloud::bigtable::CompletionQueue cq,
191                   std::vector<std::string> const& argv) {
192   //! [async read row]
193   namespace cbt = google::cloud::bigtable;
194   using google::cloud::future;
195   using google::cloud::StatusOr;
196   [](cbt::CompletionQueue cq, google::cloud::bigtable::Table table,
197      std::string const& row_key) {
198     // Filter the results, only include the latest value on each cell.
199     cbt::Filter filter = cbt::Filter::Latest(1);
200     table.AsyncReadRow(cq, row_key, std::move(filter))
201         .then(
202             [row_key](future<StatusOr<std::pair<bool, cbt::Row>>> row_future) {
203               // Read a row, this returns a tuple (bool, row)
204               auto tuple = row_future.get();
205               if (!tuple) throw std::runtime_error(tuple.status().message());
206               if (!tuple->first) {
207                 std::cout << "Row " << row_key << " not found\n";
208                 return;
209               }
210               std::cout << "key: " << tuple->second.row_key() << "\n";
211               for (auto& cell : tuple->second.cells()) {
212                 std::cout << "    " << cell.family_name() << ":"
213                           << cell.column_qualifier() << " = <";
214                 if (cell.column_qualifier() == "counter") {
215                   // This example uses "counter" to store 64-bit numbers in
216                   // big-endian format, extract them as follows:
217                   std::cout
218                       << cell.decode_big_endian_integer<std::int64_t>().value();
219                 } else {
220                   std::cout << cell.value();
221                 }
222                 std::cout << ">\n";
223               }
224             })
225         .get();  // block to simplify the example
226   }
227   //! [async read row]
228   (std::move(cq), std::move(table), argv.at(0));
229 }
230 
AsyncCheckAndMutate(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)231 void AsyncCheckAndMutate(google::cloud::bigtable::Table table,
232                          google::cloud::bigtable::CompletionQueue cq,
233                          std::vector<std::string> const& argv) {
234   //! [async check and mutate]
235   namespace cbt = google::cloud::bigtable;
236   using google::cloud::future;
237   using google::cloud::StatusOr;
238   [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
239     // Check if the latest value of the flip-flop column is "on".
240     cbt::Filter predicate = cbt::Filter::Chain(
241         cbt::Filter::ColumnRangeClosed("fam", "flip-flop", "flip-flop"),
242         cbt::Filter::Latest(1), cbt::Filter::ValueRegex("on"));
243     future<StatusOr<cbt::MutationBranch>> branch_future =
244         table.AsyncCheckAndMutateRow(row_key, std::move(predicate),
245                                      {cbt::SetCell("fam", "flip-flop", "off"),
246                                       cbt::SetCell("fam", "flop-flip", "on")},
247                                      {cbt::SetCell("fam", "flip-flop", "on"),
248                                       cbt::SetCell("fam", "flop-flip", "off")},
249                                      cq);
250 
251     branch_future
252         .then([](future<StatusOr<cbt::MutationBranch>> f) {
253           auto response = f.get();
254           if (!response) throw std::runtime_error(response.status().message());
255           if (*response == cbt::MutationBranch::kPredicateMatched) {
256             std::cout << "The predicate was matched\n";
257           } else {
258             std::cout << "The predicate was not matched\n";
259           }
260         })
261         .get();  // block to simplify the example.
262   }
263   //! [async check and mutate]
264   (std::move(table), std::move(cq), argv.at(0));
265 }
266 
AsyncReadModifyWrite(google::cloud::bigtable::Table table,google::cloud::bigtable::CompletionQueue cq,std::vector<std::string> const & argv)267 void AsyncReadModifyWrite(google::cloud::bigtable::Table table,
268                           google::cloud::bigtable::CompletionQueue cq,
269                           std::vector<std::string> const& argv) {
270   //! [async read modify write]
271   namespace cbt = google::cloud::bigtable;
272   using google::cloud::future;
273   using google::cloud::StatusOr;
274   [](cbt::Table table, cbt::CompletionQueue cq, std::string const& row_key) {
275     future<StatusOr<cbt::Row>> row_future = table.AsyncReadModifyWriteRow(
276         std::move(row_key), cq,
277         cbt::ReadModifyWriteRule::AppendValue("fam", "list", ";element"));
278 
279     row_future
280         .then([](future<StatusOr<cbt::Row>> f) {
281           auto row = f.get();
282           // As the modify in this example is not idempotent, and this example
283           // does not attempt to retry if there is a failure, we simply print
284           // such failures, if any, and otherwise ignore them.
285           if (!row) {
286             std::cout << "Failed to append row: " << row.status().message()
287                       << "\n";
288             return;
289           }
290           std::cout << "Successfully appended to " << row->row_key() << "\n";
291         })
292         .get();  // block to simplify example.
293   }
294   //! [async read modify write]
295   (std::move(table), std::move(cq), argv.at(0));
296 }
297 
RunAll(std::vector<std::string> const & argv)298 void RunAll(std::vector<std::string> const& argv) {
299   namespace examples = ::google::cloud::bigtable::examples;
300   namespace cbt = google::cloud::bigtable;
301 
302   if (!argv.empty()) throw examples::Usage{"auto"};
303   examples::CheckEnvironmentVariablesAreSet({
304       "GOOGLE_CLOUD_PROJECT",
305       "GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID",
306   });
307   auto const project_id =
308       google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value();
309   auto const instance_id = google::cloud::internal::GetEnv(
310                                "GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID")
311                                .value();
312 
313   cbt::TableAdmin admin(
314       cbt::CreateDefaultAdminClient(project_id, cbt::ClientOptions{}),
315       instance_id);
316 
317   // If a previous run of these samples crashes before cleaning up there may be
318   // old tables left over. As there are quotas on the total number of tables we
319   // remove stale tables after 48 hours.
320   examples::CleanupOldTables("data-async-", admin);
321 
322   // Initialize a generator with some amount of entropy.
323   auto generator = google::cloud::internal::DefaultPRNG(std::random_device{}());
324   auto const table_id = examples::RandomTableId("data-async-", generator);
325 
326   std::cout << "\nCreating table to run the examples (" << table_id << ")"
327             << std::endl;
328   auto schema = admin.CreateTable(
329       table_id,
330       cbt::TableConfig({{"fam", cbt::GcRule::MaxNumVersions(10)}}, {}));
331   if (!schema) throw std::runtime_error(schema.status().message());
332 
333   google::cloud::bigtable::Table table(
334       google::cloud::bigtable::CreateDefaultDataClient(
335           admin.project(), admin.instance_id(),
336           google::cloud::bigtable::ClientOptions()),
337       table_id, cbt::AlwaysRetryMutationPolicy());
338 
339   google::cloud::CompletionQueue cq;
340   std::thread th([&cq] { cq.Run(); });
341   examples::AutoShutdownCQ shutdown(cq, std::move(th));
342 
343   std::cout << "\nRunning the AsyncApply() example" << std::endl;
344   AsyncApply(table, cq, {"row-0001"});
345 
346   std::cout << "\nRunning the AsyncBulkApply() example" << std::endl;
347   AsyncBulkApply(table, cq, {});
348 
349   std::cout << "\nRunning the AsyncReadRows() example" << std::endl;
350   AsyncReadRows(table, cq, {});
351 
352   std::cout << "\nRunning the AsyncReadRows() example" << std::endl;
353   AsyncReadRowsWithLimit(table, cq, {});
354 
355   std::cout << "\nRunning the AsyncReadRow() example [1]" << std::endl;
356   AsyncReadRow(table, cq, {"row-0001"});
357 
358   std::cout << "\nRunning the AsyncReadRow() example [2]" << std::endl;
359   AsyncReadRow(table, cq, {"row-not-found-key"});
360 
361   std::cout << "\nRunning the AsyncApply() example [2]" << std::endl;
362   AsyncApply(table, cq, {"check-and-mutate-row-key"});
363 
364   std::cout << "\nRunning the AsyncCheckAndMutate() example" << std::endl;
365   AsyncCheckAndMutate(table, cq, {"check-and-mutate-row-key"});
366 
367   std::cout << "\nRunning the AsyncApply() example [3]" << std::endl;
368   AsyncApply(table, cq, {"read-modify-write-row-key"});
369 
370   std::cout << "\nRunning the AsyncReadModifyWrite() example" << std::endl;
371   AsyncReadModifyWrite(table, cq, {"read-modify-write-row-key"});
372 
373   (void)admin.DeleteTable(table_id);
374 }
375 
376 }  // anonymous namespace
377 
main(int argc,char * argv[])378 int main(int argc, char* argv[]) {
379   google::cloud::testing_util::InstallCrashHandler(argv[0]);
380 
381   using google::cloud::bigtable::examples::MakeCommandEntry;
382   google::cloud::bigtable::examples::Example example({
383       MakeCommandEntry("async-apply", {"<row-key>"}, AsyncApply),
384       MakeCommandEntry("async-bulk-apply", {}, AsyncBulkApply),
385       MakeCommandEntry("async-read-rows", {}, AsyncReadRows),
386       MakeCommandEntry("async-read-rows-with-limit", {},
387                        AsyncReadRowsWithLimit),
388       MakeCommandEntry("async-read-row", {"<row-key>"}, AsyncReadRow),
389       MakeCommandEntry("async-check-and-mutate", {"<row-key>"},
390                        AsyncCheckAndMutate),
391       MakeCommandEntry("async-read-modify-write", {}, AsyncReadModifyWrite),
392       {"auto", RunAll},
393   });
394   return example.Run(argc, argv);
395 }
396