1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/table_admin.h"
16 #include "google/cloud/bigtable/internal/async_retry_multi_page.h"
17 #include "google/cloud/bigtable/internal/async_retry_op.h"
18 #include "google/cloud/bigtable/internal/async_retry_unary_rpc_and_poll.h"
19 #include "google/cloud/bigtable/internal/unary_client_utils.h"
20 #include "google/cloud/grpc_error_delegate.h"
21 #include "google/cloud/internal/async_retry_unary_rpc.h"
22 #include "google/cloud/internal/retry_policy.h"
23 #include "google/cloud/internal/time_utils.h"
24 #include <google/protobuf/duration.pb.h>
25 #include <sstream>
26 
27 namespace btadmin = ::google::bigtable::admin::v2;
28 
29 namespace google {
30 namespace cloud {
31 namespace bigtable {
32 inline namespace BIGTABLE_CLIENT_NS {
33 static_assert(std::is_copy_constructible<bigtable::TableAdmin>::value,
34               "bigtable::TableAdmin must be constructible");
35 static_assert(std::is_copy_assignable<bigtable::TableAdmin>::value,
36               "bigtable::TableAdmin must be assignable");
37 
38 // NOLINTNEXTLINE(readability-identifier-naming)
39 constexpr TableAdmin::TableView TableAdmin::VIEW_UNSPECIFIED;
40 // NOLINTNEXTLINE(readability-identifier-naming)
41 constexpr TableAdmin::TableView TableAdmin::NAME_ONLY;
42 // NOLINTNEXTLINE(readability-identifier-naming)
43 constexpr TableAdmin::TableView TableAdmin::SCHEMA_VIEW;
44 // NOLINTNEXTLINE(readability-identifier-naming)
45 constexpr TableAdmin::TableView TableAdmin::REPLICATION_VIEW;
46 // NOLINTNEXTLINE(readability-identifier-naming)
47 constexpr TableAdmin::TableView TableAdmin::FULL;
48 
49 /// Shortcuts to avoid typing long names over and over.
50 using ClientUtils = bigtable::internal::UnaryClientUtils<AdminClient>;
51 using google::cloud::internal::Idempotency;
52 
CreateTable(std::string table_id,TableConfig config)53 StatusOr<btadmin::Table> TableAdmin::CreateTable(std::string table_id,
54                                                  TableConfig config) {
55   grpc::Status status;
56 
57   auto request = std::move(config).as_proto();
58   request.set_parent(instance_name());
59   request.set_table_id(std::move(table_id));
60 
61   // This is a non-idempotent API, use the correct retry loop for this type of
62   // operation.
63   auto result = ClientUtils::MakeNonIdempotentCall(
64       *client_, clone_rpc_retry_policy(), clone_metadata_update_policy(),
65       &AdminClient::CreateTable, request, "CreateTable", status);
66 
67   if (!status.ok()) {
68     return google::cloud::MakeStatusFromRpcError(status);
69   }
70   return result;
71 }
72 
AsyncCreateTable(CompletionQueue & cq,std::string table_id,TableConfig config)73 future<StatusOr<btadmin::Table>> TableAdmin::AsyncCreateTable(
74     CompletionQueue& cq, std::string table_id, TableConfig config) {
75   btadmin::CreateTableRequest request = std::move(config).as_proto();
76   request.set_parent(instance_name());
77   request.set_table_id(std::move(table_id));
78 
79   auto client = client_;
80   auto metadata_update_policy = clone_metadata_update_policy();
81   return google::cloud::internal::StartRetryAsyncUnaryRpc(
82       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
83       Idempotency::kNonIdempotent,
84       [client, metadata_update_policy](
85           grpc::ClientContext* context,
86           btadmin::CreateTableRequest const& request,
87           grpc::CompletionQueue* cq) {
88         metadata_update_policy.Setup(*context);
89         return client->AsyncCreateTable(context, request, cq);
90       },
91       std::move(request));
92 }
93 
AsyncGetTable(CompletionQueue & cq,std::string const & table_id,btadmin::Table::View view)94 future<StatusOr<google::bigtable::admin::v2::Table>> TableAdmin::AsyncGetTable(
95     CompletionQueue& cq, std::string const& table_id,
96     btadmin::Table::View view) {
97   google::bigtable::admin::v2::GetTableRequest request{};
98   auto name = TableName(table_id);
99   request.set_name(name);
100   request.set_view(view);
101 
102   // Copy the client because we lack C++14 extended lambda captures.
103   auto client = client_;
104   auto metadata_update_policy = clone_metadata_update_policy();
105   return google::cloud::internal::StartRetryAsyncUnaryRpc(
106       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
107       Idempotency::kIdempotent,
108       [client, metadata_update_policy](
109           grpc::ClientContext* context,
110           google::bigtable::admin::v2::GetTableRequest const& request,
111           grpc::CompletionQueue* cq) {
112         metadata_update_policy.Setup(*context);
113         return client->AsyncGetTable(context, request, cq);
114       },
115       std::move(request));
116 }
117 
ListTables(btadmin::Table::View view)118 StatusOr<std::vector<btadmin::Table>> TableAdmin::ListTables(
119     btadmin::Table::View view) {
120   grpc::Status status;
121 
122   // Copy the policies in effect for the operation.
123   auto rpc_policy = clone_rpc_retry_policy();
124   auto backoff_policy = clone_rpc_backoff_policy();
125 
126   // Build the RPC request, try to minimize copying.
127   std::vector<btadmin::Table> result;
128   std::string page_token;
129   do {
130     btadmin::ListTablesRequest request;
131     request.set_page_token(std::move(page_token));
132     request.set_parent(instance_name());
133     request.set_view(view);
134 
135     auto response = ClientUtils::MakeCall(
136         *client_, *rpc_policy, *backoff_policy, clone_metadata_update_policy(),
137         &AdminClient::ListTables, request, "TableAdmin", status,
138         Idempotency::kIdempotent);
139 
140     if (!status.ok()) {
141       return google::cloud::MakeStatusFromRpcError(status);
142     }
143 
144     for (auto& x : *response.mutable_tables()) {
145       result.emplace_back(std::move(x));
146     }
147     page_token = std::move(*response.mutable_next_page_token());
148   } while (!page_token.empty());
149   return result;
150 }
151 
AsyncListTables(CompletionQueue & cq,btadmin::Table::View view)152 future<StatusOr<std::vector<btadmin::Table>>> TableAdmin::AsyncListTables(
153     CompletionQueue& cq, btadmin::Table::View view) {
154   auto client = client_;
155   btadmin::ListTablesRequest request;
156   request.set_parent(instance_name());
157   request.set_view(view);
158 
159   return internal::StartAsyncRetryMultiPage(
160       __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
161       clone_metadata_update_policy(),
162       [client](grpc::ClientContext* context,
163                btadmin::ListTablesRequest const& request,
164                grpc::CompletionQueue* cq) {
165         return client->AsyncListTables(context, request, cq);
166       },
167       std::move(request), std::vector<btadmin::Table>(),
168       [](std::vector<btadmin::Table> acc,
169          btadmin::ListTablesResponse const& response) {
170         std::move(response.tables().begin(), response.tables().end(),
171                   std::back_inserter(acc));
172         return acc;
173       },
174       cq);
175 }
176 
GetTable(std::string const & table_id,btadmin::Table::View view)177 StatusOr<btadmin::Table> TableAdmin::GetTable(std::string const& table_id,
178                                               btadmin::Table::View view) {
179   grpc::Status status;
180   btadmin::GetTableRequest request;
181   auto name = TableName(table_id);
182   request.set_name(name);
183   request.set_view(view);
184 
185   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
186 
187   auto result = ClientUtils::MakeCall(
188       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
189       metadata_update_policy, &AdminClient::GetTable, request, "GetTable",
190       status, Idempotency::kIdempotent);
191   if (!status.ok()) {
192     return google::cloud::MakeStatusFromRpcError(status);
193   }
194 
195   return result;
196 }
197 
DeleteTable(std::string const & table_id)198 Status TableAdmin::DeleteTable(std::string const& table_id) {
199   grpc::Status status;
200   btadmin::DeleteTableRequest request;
201   auto name = TableName(table_id);
202   request.set_name(name);
203 
204   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
205 
206   // This is a non-idempotent API, use the correct retry loop for this type of
207   // operation.
208   ClientUtils::MakeNonIdempotentCall(
209       *client_, clone_rpc_retry_policy(), metadata_update_policy,
210       &AdminClient::DeleteTable, request, "DeleteTable", status);
211 
212   return google::cloud::MakeStatusFromRpcError(status);
213 }
214 
AsyncDeleteTable(CompletionQueue & cq,std::string const & table_id)215 future<Status> TableAdmin::AsyncDeleteTable(CompletionQueue& cq,
216                                             std::string const& table_id) {
217   grpc::Status status;
218   btadmin::DeleteTableRequest request;
219   auto name = TableName(table_id);
220   request.set_name(name);
221 
222   auto client = client_;
223   return google::cloud::internal::StartRetryAsyncUnaryRpc(
224              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
225              Idempotency::kIdempotent,
226              [client, name](
227                  grpc::ClientContext* context,
228                  google::bigtable::admin::v2::DeleteTableRequest const& request,
229                  grpc::CompletionQueue* cq) {
230                MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
231                    .Setup(*context);
232                return client->AsyncDeleteTable(context, request, cq);
233              },
234              std::move(request))
235       .then([](future<StatusOr<google::protobuf::Empty>> r) {
236         return r.get().status();
237       });
238 }
239 
240 google::bigtable::admin::v2::CreateBackupRequest
AsProto(std::string instance_name) const241 TableAdmin::CreateBackupParams::AsProto(std::string instance_name) const {
242   google::bigtable::admin::v2::CreateBackupRequest proto;
243   proto.set_parent(instance_name + "/clusters/" + cluster_id);
244   proto.set_backup_id(backup_id);
245   proto.mutable_backup()->set_source_table(std::move(instance_name) +
246                                            "/tables/" + table_name);
247   *proto.mutable_backup()->mutable_expire_time() =
248       google::cloud::internal::ToProtoTimestamp(expire_time);
249   return proto;
250 }
251 
CreateBackup(CreateBackupParams const & params)252 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::CreateBackup(
253     CreateBackupParams const& params) {
254   CompletionQueue cq;
255   std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
256   return AsyncCreateBackup(cq, params)
257       .then(
258           [cq](
259               future<StatusOr<google::bigtable::admin::v2::Backup>> f) mutable {
260             cq.Shutdown();
261             return f.get();
262           })
263       .get();
264 }
265 
266 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncCreateBackup(CompletionQueue & cq,CreateBackupParams const & params)267 TableAdmin::AsyncCreateBackup(CompletionQueue& cq,
268                               CreateBackupParams const& params) {
269   auto request = params.AsProto(instance_name());
270   MetadataUpdatePolicy metadata_update_policy(request.parent(),
271                                               MetadataParamTypes::PARENT);
272   auto client = client_;
273   return internal::AsyncStartPollAfterRetryUnaryRpc<
274       google::bigtable::admin::v2::Backup>(
275       __func__, clone_polling_policy(), clone_rpc_retry_policy(),
276       clone_rpc_backoff_policy(),
277       internal::ConstantIdempotencyPolicy(Idempotency::kNonIdempotent),
278       metadata_update_policy, client,
279       [client](grpc::ClientContext* context,
280                google::bigtable::admin::v2::CreateBackupRequest const& request,
281                grpc::CompletionQueue* cq) {
282         return client->AsyncCreateBackup(context, request, cq);
283       },
284       std::move(request), cq);
285 }
286 
GetBackup(std::string const & cluster_id,std::string const & backup_id)287 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::GetBackup(
288     std::string const& cluster_id, std::string const& backup_id) {
289   grpc::Status status;
290   btadmin::GetBackupRequest request;
291   std::string name =
292       instance_name() + "/clusters/" + cluster_id + "/backups/" + backup_id;
293   request.set_name(name);
294 
295   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
296 
297   auto result = ClientUtils::MakeCall(
298       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
299       metadata_update_policy, &AdminClient::GetBackup, request, "GetBackup",
300       status, Idempotency::kIdempotent);
301   if (!status.ok()) {
302     return google::cloud::MakeStatusFromRpcError(status);
303   }
304 
305   return result;
306 }
307 
308 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncGetBackup(CompletionQueue & cq,std::string const & cluster_id,std::string const & backup_id)309 TableAdmin::AsyncGetBackup(CompletionQueue& cq, std::string const& cluster_id,
310                            std::string const& backup_id) {
311   google::bigtable::admin::v2::GetBackupRequest request{};
312   std::string name =
313       instance_name() + "/clusters/" + cluster_id + "/backups/" + backup_id;
314   request.set_name(name);
315 
316   // Copy the client because we lack C++14 extended lambda captures.
317   auto client = client_;
318   auto metadata_update_policy = clone_metadata_update_policy();
319   return google::cloud::internal::StartRetryAsyncUnaryRpc(
320       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
321       Idempotency::kIdempotent,
322       [client, metadata_update_policy](
323           grpc::ClientContext* context,
324           google::bigtable::admin::v2::GetBackupRequest const& request,
325           grpc::CompletionQueue* cq) {
326         metadata_update_policy.Setup(*context);
327         return client->AsyncGetBackup(context, request, cq);
328       },
329       std::move(request));
330 }
331 
332 google::bigtable::admin::v2::UpdateBackupRequest
AsProto(std::string const & instance_name) const333 TableAdmin::UpdateBackupParams::AsProto(
334     std::string const& instance_name) const {
335   google::bigtable::admin::v2::UpdateBackupRequest proto;
336   proto.mutable_backup()->set_name(instance_name + "/clusters/" + cluster_id +
337                                    "/backups/" + backup_name);
338   *proto.mutable_backup()->mutable_expire_time() =
339       google::cloud::internal::ToProtoTimestamp(expire_time);
340   proto.mutable_update_mask()->add_paths("expire_time");
341   return proto;
342 }
343 
UpdateBackup(UpdateBackupParams const & params)344 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::UpdateBackup(
345     UpdateBackupParams const& params) {
346   grpc::Status status;
347   btadmin::UpdateBackupRequest request = params.AsProto(instance_name());
348 
349   MetadataUpdatePolicy metadata_update_policy(request.backup().name(),
350                                               MetadataParamTypes::BACKUP_NAME);
351 
352   auto result = ClientUtils::MakeCall(
353       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
354       metadata_update_policy, &AdminClient::UpdateBackup, request,
355       "UpdateBackup", status, Idempotency::kIdempotent);
356   if (!status.ok()) {
357     return google::cloud::MakeStatusFromRpcError(status);
358   }
359 
360   return result;
361 }
362 
363 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncUpdateBackup(CompletionQueue & cq,UpdateBackupParams const & params)364 TableAdmin::AsyncUpdateBackup(CompletionQueue& cq,
365                               UpdateBackupParams const& params) {
366   btadmin::UpdateBackupRequest request = params.AsProto(instance_name());
367 
368   // Copy the client because we lack C++14 extended lambda captures.
369   auto client = client_;
370   auto metadata_update_policy = clone_metadata_update_policy();
371   return google::cloud::internal::StartRetryAsyncUnaryRpc(
372       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
373       Idempotency::kIdempotent,
374       [client, metadata_update_policy](
375           grpc::ClientContext* context,
376           google::bigtable::admin::v2::UpdateBackupRequest const& request,
377           grpc::CompletionQueue* cq) {
378         metadata_update_policy.Setup(*context);
379         return client->AsyncUpdateBackup(context, request, cq);
380       },
381       std::move(request));
382 }
383 
DeleteBackup(google::bigtable::admin::v2::Backup const & backup)384 Status TableAdmin::DeleteBackup(
385     google::bigtable::admin::v2::Backup const& backup) {
386   grpc::Status status;
387   btadmin::DeleteBackupRequest request;
388   request.set_name(backup.name());
389   MetadataUpdatePolicy metadata_update_policy(request.name(),
390                                               MetadataParamTypes::NAME);
391   auto result = ClientUtils::MakeCall(
392       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
393       metadata_update_policy, &AdminClient::DeleteBackup, request,
394       "DeleteBackup", status, Idempotency::kIdempotent);
395   if (!status.ok()) {
396     return google::cloud::MakeStatusFromRpcError(status);
397   }
398 
399   return {};
400 }
401 
AsyncDeleteBackup(CompletionQueue & cq,google::bigtable::admin::v2::Backup const & backup)402 future<Status> TableAdmin::AsyncDeleteBackup(
403     CompletionQueue& cq, google::bigtable::admin::v2::Backup const& backup) {
404   grpc::Status status;
405   btadmin::DeleteBackupRequest request;
406   request.set_name(backup.name());
407   auto client = client_;
408   auto metadata_update_policy = clone_metadata_update_policy();
409   return google::cloud::internal::StartRetryAsyncUnaryRpc(
410              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
411              Idempotency::kIdempotent,
412              [client, metadata_update_policy](
413                  grpc::ClientContext* context,
414                  google::bigtable::admin::v2::DeleteBackupRequest const&
415                      request,
416                  grpc::CompletionQueue* cq) {
417                metadata_update_policy.Setup(*context);
418 
419                return client->AsyncDeleteBackup(context, request, cq);
420              },
421              std::move(request))
422       .then([](future<StatusOr<google::protobuf::Empty>> r) {
423         return r.get().status();
424       });
425 }
426 
DeleteBackup(std::string const & cluster_id,std::string const & backup_id)427 Status TableAdmin::DeleteBackup(std::string const& cluster_id,
428                                 std::string const& backup_id) {
429   grpc::Status status;
430   btadmin::DeleteBackupRequest request;
431   request.set_name(instance_name() + "/clusters/" + cluster_id + "/backups/" +
432                    backup_id);
433 
434   MetadataUpdatePolicy metadata_update_policy(request.name(),
435                                               MetadataParamTypes::NAME);
436 
437   auto result = ClientUtils::MakeCall(
438       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
439       metadata_update_policy, &AdminClient::DeleteBackup, request,
440       "DeleteBackup", status, Idempotency::kIdempotent);
441   if (!status.ok()) {
442     return google::cloud::MakeStatusFromRpcError(status);
443   }
444 
445   return {};
446 }
447 
AsyncDeleteBackup(CompletionQueue & cq,std::string const & cluster_id,std::string const & backup_id)448 future<Status> TableAdmin::AsyncDeleteBackup(CompletionQueue& cq,
449                                              std::string const& cluster_id,
450                                              std::string const& backup_id) {
451   grpc::Status status;
452   btadmin::DeleteBackupRequest request;
453   request.set_name(instance_name() + "/clusters/" + cluster_id + "/backups/" +
454                    backup_id);
455   auto client = client_;
456   auto metadata_update_policy = clone_metadata_update_policy();
457   return google::cloud::internal::StartRetryAsyncUnaryRpc(
458              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
459              Idempotency::kIdempotent,
460              [client, metadata_update_policy](
461                  grpc::ClientContext* context,
462                  google::bigtable::admin::v2::DeleteBackupRequest const&
463                      request,
464                  grpc::CompletionQueue* cq) {
465                metadata_update_policy.Setup(*context);
466 
467                return client->AsyncDeleteBackup(context, request, cq);
468              },
469              std::move(request))
470       .then([](future<StatusOr<google::protobuf::Empty>> r) {
471         return r.get().status();
472       });
473 }
474 
475 google::bigtable::admin::v2::ListBackupsRequest
AsProto(std::string const & instance_name) const476 TableAdmin::ListBackupsParams::AsProto(std::string const& instance_name) const {
477   google::bigtable::admin::v2::ListBackupsRequest proto;
478   proto.set_parent(cluster_id ? instance_name + "/clusters/" + *cluster_id
479                               : instance_name + "/clusters/-");
480   if (filter) *proto.mutable_filter() = *filter;
481   if (order_by) *proto.mutable_order_by() = *order_by;
482   return proto;
483 }
484 
485 StatusOr<std::vector<google::bigtable::admin::v2::Backup>>
ListBackups(ListBackupsParams const & params)486 TableAdmin::ListBackups(ListBackupsParams const& params) {
487   grpc::Status status;
488 
489   // Copy the policies in effect for the operation.
490   auto rpc_policy = clone_rpc_retry_policy();
491   auto backoff_policy = clone_rpc_backoff_policy();
492 
493   // Build the RPC request, try to minimize copying.
494   std::vector<btadmin::Backup> result;
495   btadmin::ListBackupsRequest request = params.AsProto(instance_name());
496 
497   MetadataUpdatePolicy metadata_update_policy(request.parent(),
498                                               MetadataParamTypes::PARENT);
499 
500   std::string page_token;
501   do {
502     request.set_page_token(std::move(page_token));
503 
504     auto response = ClientUtils::MakeCall(
505         *client_, *rpc_policy, *backoff_policy, metadata_update_policy,
506         &AdminClient::ListBackups, request, "TableAdmin", status,
507         Idempotency::kIdempotent);
508 
509     if (!status.ok()) {
510       return google::cloud::MakeStatusFromRpcError(status);
511     }
512 
513     for (auto& x : *response.mutable_backups()) {
514       result.emplace_back(std::move(x));
515     }
516     page_token = std::move(*response.mutable_next_page_token());
517   } while (!page_token.empty());
518   return result;
519 }
520 
521 future<StatusOr<std::vector<google::bigtable::admin::v2::Backup>>>
AsyncListBackups(CompletionQueue & cq,ListBackupsParams const & params)522 TableAdmin::AsyncListBackups(CompletionQueue& cq,
523                              ListBackupsParams const& params) {
524   auto client = client_;
525   btadmin::ListBackupsRequest request = params.AsProto(instance_name());
526   return internal::StartAsyncRetryMultiPage(
527       __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
528       clone_metadata_update_policy(),
529       [client](grpc::ClientContext* context,
530                btadmin::ListBackupsRequest const& request,
531                grpc::CompletionQueue* cq) {
532         return client->AsyncListBackups(context, request, cq);
533       },
534       std::move(request), std::vector<btadmin::Backup>(),
535       [](std::vector<btadmin::Backup> acc,
536          btadmin::ListBackupsResponse const& response) {
537         std::move(response.backups().begin(), response.backups().end(),
538                   std::back_inserter(acc));
539         return acc;
540       },
541       cq);
542 }
543 
544 google::bigtable::admin::v2::RestoreTableRequest
AsProto(std::string const & instance_name) const545 TableAdmin::RestoreTableParams::AsProto(
546     std::string const& instance_name) const {
547   google::bigtable::admin::v2::RestoreTableRequest proto;
548   proto.set_parent(instance_name);
549   proto.set_table_id(table_id);
550   proto.set_backup(instance_name + "/clusters/" + cluster_id + "/backups/" +
551                    backup_id);
552   return proto;
553 }
554 
RestoreTable(RestoreTableParams const & params)555 StatusOr<google::bigtable::admin::v2::Table> TableAdmin::RestoreTable(
556     RestoreTableParams const& params) {
557   CompletionQueue cq;
558   std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
559   return AsyncRestoreTable(cq, params)
560       .then(
561           [cq](future<StatusOr<google::bigtable::admin::v2::Table>> f) mutable {
562             cq.Shutdown();
563             return f.get();
564           })
565       .get();
566 }
567 
568 future<StatusOr<google::bigtable::admin::v2::Table>>
AsyncRestoreTable(CompletionQueue & cq,RestoreTableParams const & params)569 TableAdmin::AsyncRestoreTable(CompletionQueue& cq,
570                               RestoreTableParams const& params) {
571   MetadataUpdatePolicy metadata_update_policy(instance_name(),
572                                               MetadataParamTypes::PARENT);
573   auto client = client_;
574   return internal::AsyncStartPollAfterRetryUnaryRpc<
575       google::bigtable::admin::v2::Table>(
576       __func__, clone_polling_policy(), clone_rpc_retry_policy(),
577       clone_rpc_backoff_policy(),
578       internal::ConstantIdempotencyPolicy(Idempotency::kNonIdempotent),
579       metadata_update_policy, client,
580       [client](grpc::ClientContext* context,
581                google::bigtable::admin::v2::RestoreTableRequest const& request,
582                grpc::CompletionQueue* cq) {
583         return client->AsyncRestoreTable(context, request, cq);
584       },
585       params.AsProto(instance_name()), cq);
586 }
587 
ModifyColumnFamilies(std::string const & table_id,std::vector<ColumnFamilyModification> modifications)588 StatusOr<btadmin::Table> TableAdmin::ModifyColumnFamilies(
589     std::string const& table_id,
590     std::vector<ColumnFamilyModification> modifications) {
591   grpc::Status status;
592 
593   btadmin::ModifyColumnFamiliesRequest request;
594   auto name = TableName(table_id);
595   request.set_name(name);
596   for (auto& m : modifications) {
597     *request.add_modifications() = std::move(m).as_proto();
598   }
599   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
600   auto result = ClientUtils::MakeNonIdempotentCall(
601       *client_, clone_rpc_retry_policy(), metadata_update_policy,
602       &AdminClient::ModifyColumnFamilies, request, "ModifyColumnFamilies",
603       status);
604 
605   if (!status.ok()) {
606     return google::cloud::MakeStatusFromRpcError(status);
607   }
608   return result;
609 }
610 
AsyncModifyColumnFamilies(CompletionQueue & cq,std::string const & table_id,std::vector<ColumnFamilyModification> modifications)611 future<StatusOr<btadmin::Table>> TableAdmin::AsyncModifyColumnFamilies(
612     CompletionQueue& cq, std::string const& table_id,
613     std::vector<ColumnFamilyModification> modifications) {
614   btadmin::ModifyColumnFamiliesRequest request;
615   auto name = TableName(table_id);
616   request.set_name(name);
617   for (auto& m : modifications) {
618     *request.add_modifications() = std::move(m).as_proto();
619   }
620 
621   auto client = client_;
622   return google::cloud::internal::StartRetryAsyncUnaryRpc(
623       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
624       Idempotency::kIdempotent,
625       [client, name](grpc::ClientContext* context,
626                      btadmin::ModifyColumnFamiliesRequest const& request,
627                      grpc::CompletionQueue* cq) {
628         MetadataUpdatePolicy(name, MetadataParamTypes::NAME).Setup(*context);
629         return client->AsyncModifyColumnFamilies(context, request, cq);
630       },
631       std::move(request));
632 }
633 
DropRowsByPrefix(std::string const & table_id,std::string row_key_prefix)634 Status TableAdmin::DropRowsByPrefix(std::string const& table_id,
635                                     std::string row_key_prefix) {
636   grpc::Status status;
637   btadmin::DropRowRangeRequest request;
638   auto name = TableName(table_id);
639   request.set_name(name);
640   request.set_row_key_prefix(std::move(row_key_prefix));
641   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
642   ClientUtils::MakeNonIdempotentCall(
643       *client_, clone_rpc_retry_policy(), metadata_update_policy,
644       &AdminClient::DropRowRange, request, "DropRowByPrefix", status);
645 
646   return google::cloud::MakeStatusFromRpcError(status);
647 }
648 
AsyncDropRowsByPrefix(CompletionQueue & cq,std::string const & table_id,std::string row_key_prefix)649 future<Status> TableAdmin::AsyncDropRowsByPrefix(CompletionQueue& cq,
650                                                  std::string const& table_id,
651                                                  std::string row_key_prefix) {
652   google::bigtable::admin::v2::DropRowRangeRequest request;
653   auto name = TableName(table_id);
654   request.set_name(name);
655   request.set_row_key_prefix(std::move(row_key_prefix));
656 
657   auto client = client_;
658   return google::cloud::internal::StartRetryAsyncUnaryRpc(
659              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
660              Idempotency::kIdempotent,
661              [client, name](grpc::ClientContext* context,
662                             btadmin::DropRowRangeRequest const& request,
663                             grpc::CompletionQueue* cq) {
664                MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
665                    .Setup(*context);
666                return client->AsyncDropRowRange(context, request, cq);
667              },
668              std::move(request))
669       .then([](future<StatusOr<google::protobuf::Empty>> r) {
670         return r.get().status();
671       });
672 }
673 
WaitForConsistency(std::string const & table_id,std::string const & consistency_token)674 google::cloud::future<StatusOr<Consistency>> TableAdmin::WaitForConsistency(
675     std::string const& table_id, std::string const& consistency_token) {
676   CompletionQueue cq;
677   std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
678 
679   return AsyncWaitForConsistency(cq, table_id, consistency_token)
680       .then([cq](future<StatusOr<Consistency>> f) mutable {
681         cq.Shutdown();
682         return f.get();
683       });
684 }
685 
686 google::cloud::future<StatusOr<Consistency>>
AsyncWaitForConsistency(CompletionQueue & cq,std::string const & table_id,std::string const & consistency_token)687 TableAdmin::AsyncWaitForConsistency(CompletionQueue& cq,
688                                     std::string const& table_id,
689                                     std::string const& consistency_token) {
690   class AsyncWaitForConsistencyState
691       : public std::enable_shared_from_this<AsyncWaitForConsistencyState> {
692    public:
693     static future<StatusOr<Consistency>> Create(
694         CompletionQueue cq, std::string table_id, std::string consistency_token,
695         TableAdmin const& table_admin,
696         std::unique_ptr<PollingPolicy> polling_policy) {
697       std::shared_ptr<AsyncWaitForConsistencyState> state(
698           new AsyncWaitForConsistencyState(
699               std::move(cq), std::move(table_id), std::move(consistency_token),
700               table_admin, std::move(polling_policy)));
701 
702       state->StartIteration();
703       return state->promise_.get_future();
704     }
705 
706    private:
707     AsyncWaitForConsistencyState(CompletionQueue cq, std::string table_id,
708                                  std::string consistency_token,
709                                  TableAdmin const& table_admin,
710                                  std::unique_ptr<PollingPolicy> polling_policy)
711         : cq_(std::move(cq)),
712           table_id_(std::move(table_id)),
713           consistency_token_(std::move(consistency_token)),
714           table_admin_(table_admin),
715           polling_policy_(std::move(polling_policy)) {}
716 
717     void StartIteration() {
718       auto self = shared_from_this();
719       table_admin_.AsyncCheckConsistency(cq_, table_id_, consistency_token_)
720           .then([self](future<StatusOr<Consistency>> f) {
721             self->OnCheckConsistency(f.get());
722           });
723     }
724 
725     void OnCheckConsistency(StatusOr<Consistency> consistent) {
726       auto self = shared_from_this();
727       if (consistent && *consistent == Consistency::kConsistent) {
728         promise_.set_value(*consistent);
729         return;
730       }
731       auto status = std::move(consistent).status();
732       if (!polling_policy_->OnFailure(status)) {
733         promise_.set_value(std::move(status));
734         return;
735       }
736       cq_.MakeRelativeTimer(polling_policy_->WaitPeriod())
737           .then([self](future<StatusOr<std::chrono::system_clock::time_point>>
738                            result) {
739             if (auto tp = result.get()) {
740               self->StartIteration();
741             } else {
742               self->promise_.set_value(tp.status());
743             }
744           });
745     }
746 
747     CompletionQueue cq_;
748     std::string table_id_;
749     std::string consistency_token_;
750     TableAdmin table_admin_;
751     std::unique_ptr<PollingPolicy> polling_policy_;
752     google::cloud::promise<StatusOr<Consistency>> promise_;
753   };
754 
755   return AsyncWaitForConsistencyState::Create(cq, table_id, consistency_token,
756                                               *this, clone_polling_policy());
757 }
758 
DropAllRows(std::string const & table_id)759 Status TableAdmin::DropAllRows(std::string const& table_id) {
760   grpc::Status status;
761   btadmin::DropRowRangeRequest request;
762   auto name = TableName(table_id);
763   request.set_name(name);
764   request.set_delete_all_data_from_table(true);
765   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
766   ClientUtils::MakeNonIdempotentCall(
767       *client_, clone_rpc_retry_policy(), metadata_update_policy,
768       &AdminClient::DropRowRange, request, "DropAllRows", status);
769 
770   return google::cloud::MakeStatusFromRpcError(status);
771 }
772 
AsyncDropAllRows(CompletionQueue & cq,std::string const & table_id)773 future<Status> TableAdmin::AsyncDropAllRows(CompletionQueue& cq,
774                                             std::string const& table_id) {
775   google::bigtable::admin::v2::DropRowRangeRequest request;
776   auto name = TableName(table_id);
777   request.set_name(name);
778   request.set_delete_all_data_from_table(true);
779 
780   auto client = client_;
781   return google::cloud::internal::StartRetryAsyncUnaryRpc(
782              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
783              Idempotency::kIdempotent,
784              [client, name](grpc::ClientContext* context,
785                             btadmin::DropRowRangeRequest const& request,
786                             grpc::CompletionQueue* cq) {
787                MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
788                    .Setup(*context);
789                return client->AsyncDropRowRange(context, request, cq);
790              },
791              std::move(request))
792       .then([](future<StatusOr<google::protobuf::Empty>> r) {
793         return r.get().status();
794       });
795 }
796 
GenerateConsistencyToken(std::string const & table_id)797 StatusOr<std::string> TableAdmin::GenerateConsistencyToken(
798     std::string const& table_id) {
799   grpc::Status status;
800   btadmin::GenerateConsistencyTokenRequest request;
801   auto name = TableName(table_id);
802   request.set_name(name);
803   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
804 
805   auto response = ClientUtils::MakeCall(
806       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
807       metadata_update_policy, &AdminClient::GenerateConsistencyToken, request,
808       "GenerateConsistencyToken", status, Idempotency::kIdempotent);
809 
810   if (!status.ok()) {
811     return google::cloud::MakeStatusFromRpcError(status);
812   }
813   return std::move(*response.mutable_consistency_token());
814 }
815 
AsyncGenerateConsistencyToken(CompletionQueue & cq,std::string const & table_id)816 future<StatusOr<std::string>> TableAdmin::AsyncGenerateConsistencyToken(
817     CompletionQueue& cq, std::string const& table_id) {
818   btadmin::GenerateConsistencyTokenRequest request;
819   auto name = TableName(table_id);
820   request.set_name(name);
821 
822   auto client = client_;
823   return google::cloud::internal::StartRetryAsyncUnaryRpc(
824              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
825              Idempotency::kIdempotent,
826              [client, name](
827                  grpc::ClientContext* context,
828                  btadmin::GenerateConsistencyTokenRequest const& request,
829                  grpc::CompletionQueue* cq) {
830                MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
831                    .Setup(*context);
832                return client->AsyncGenerateConsistencyToken(context, request,
833                                                             cq);
834              },
835              std::move(request))
836       .then([](future<StatusOr<btadmin::GenerateConsistencyTokenResponse>> fut)
837                 -> StatusOr<std::string> {
838         auto result = fut.get();
839         if (!result) {
840           return result.status();
841         }
842         return std::move(*result->mutable_consistency_token());
843       });
844 }
845 
CheckConsistency(std::string const & table_id,std::string const & consistency_token)846 StatusOr<Consistency> TableAdmin::CheckConsistency(
847     std::string const& table_id, std::string const& consistency_token) {
848   grpc::Status status;
849   btadmin::CheckConsistencyRequest request;
850   auto name = TableName(table_id);
851   request.set_name(name);
852   request.set_consistency_token(consistency_token);
853   MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
854 
855   auto response = ClientUtils::MakeCall(
856       *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
857       metadata_update_policy, &AdminClient::CheckConsistency, request,
858       "CheckConsistency", status, Idempotency::kIdempotent);
859 
860   if (!status.ok()) {
861     return google::cloud::MakeStatusFromRpcError(status);
862   }
863 
864   return response.consistent() ? Consistency::kConsistent
865                                : Consistency::kInconsistent;
866 }
867 
AsyncCheckConsistency(CompletionQueue & cq,std::string const & table_id,std::string const & consistency_token)868 future<StatusOr<Consistency>> TableAdmin::AsyncCheckConsistency(
869     CompletionQueue& cq, std::string const& table_id,
870     std::string const& consistency_token) {
871   btadmin::CheckConsistencyRequest request;
872   auto name = TableName(table_id);
873   request.set_name(name);
874   request.set_consistency_token(consistency_token);
875 
876   auto client = client_;
877   return google::cloud::internal::StartRetryAsyncUnaryRpc(
878              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
879              Idempotency::kIdempotent,
880              [client, name](grpc::ClientContext* context,
881                             btadmin::CheckConsistencyRequest const& request,
882                             grpc::CompletionQueue* cq) {
883                MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
884                    .Setup(*context);
885                return client->AsyncCheckConsistency(context, request, cq);
886              },
887              std::move(request))
888       .then([](future<StatusOr<btadmin::CheckConsistencyResponse>> fut)
889                 -> StatusOr<Consistency> {
890         auto result = fut.get();
891         if (!result) {
892           return result.status();
893         }
894 
895         return result->consistent() ? Consistency::kConsistent
896                                     : Consistency::kInconsistent;
897       });
898 }
899 
GetIamPolicy(std::string const & table_id)900 StatusOr<google::iam::v1::Policy> TableAdmin::GetIamPolicy(
901     std::string const& table_id) {
902   grpc::Status status;
903   auto rpc_policy = clone_rpc_retry_policy();
904   auto backoff_policy = clone_rpc_backoff_policy();
905 
906   ::google::iam::v1::GetIamPolicyRequest request;
907   auto resource = TableName(table_id);
908   request.set_resource(resource);
909 
910   MetadataUpdatePolicy metadata_update_policy(resource,
911                                               MetadataParamTypes::RESOURCE);
912 
913   auto proto = ClientUtils::MakeCall(
914       *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
915       &AdminClient::GetIamPolicy, request, "GetIamPolicy", status,
916       Idempotency::kIdempotent);
917 
918   if (!status.ok()) {
919     return MakeStatusFromRpcError(status);
920   }
921 
922   return proto;
923 }
924 
AsyncGetIamPolicy(CompletionQueue & cq,std::string const & table_id)925 future<StatusOr<google::iam::v1::Policy>> TableAdmin::AsyncGetIamPolicy(
926     CompletionQueue& cq, std::string const& table_id) {
927   ::google::iam::v1::GetIamPolicyRequest request;
928   auto resource = TableName(table_id);
929   request.set_resource(resource);
930 
931   auto client = client_;
932   return google::cloud::internal::StartRetryAsyncUnaryRpc(
933       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
934       Idempotency::kIdempotent,
935       [client, resource](grpc::ClientContext* context,
936                          ::google::iam::v1::GetIamPolicyRequest const& request,
937                          grpc::CompletionQueue* cq) {
938         MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
939             .Setup(*context);
940         return client->AsyncGetIamPolicy(context, request, cq);
941       },
942       std::move(request));
943 }
944 
SetIamPolicy(std::string const & table_id,google::iam::v1::Policy const & iam_policy)945 StatusOr<google::iam::v1::Policy> TableAdmin::SetIamPolicy(
946     std::string const& table_id, google::iam::v1::Policy const& iam_policy) {
947   grpc::Status status;
948   auto rpc_policy = clone_rpc_retry_policy();
949   auto backoff_policy = clone_rpc_backoff_policy();
950 
951   ::google::iam::v1::SetIamPolicyRequest request;
952   auto resource = TableName(table_id);
953   request.set_resource(resource);
954   *request.mutable_policy() = iam_policy;
955 
956   MetadataUpdatePolicy metadata_update_policy(resource,
957                                               MetadataParamTypes::RESOURCE);
958 
959   auto proto = ClientUtils::MakeCall(
960       *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
961       &AdminClient::SetIamPolicy, request, "SetIamPolicy", status,
962       Idempotency::kIdempotent);
963 
964   if (!status.ok()) {
965     return MakeStatusFromRpcError(status);
966   }
967 
968   return proto;
969 }
970 
AsyncSetIamPolicy(CompletionQueue & cq,std::string const & table_id,google::iam::v1::Policy const & iam_policy)971 future<StatusOr<google::iam::v1::Policy>> TableAdmin::AsyncSetIamPolicy(
972     CompletionQueue& cq, std::string const& table_id,
973     google::iam::v1::Policy const& iam_policy) {
974   ::google::iam::v1::SetIamPolicyRequest request;
975   auto resource = TableName(table_id);
976   request.set_resource(resource);
977   *request.mutable_policy() = iam_policy;
978 
979   auto client = client_;
980   return google::cloud::internal::StartRetryAsyncUnaryRpc(
981       cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
982       Idempotency::kIdempotent,
983       [client, resource](grpc::ClientContext* context,
984                          ::google::iam::v1::SetIamPolicyRequest const& request,
985                          grpc::CompletionQueue* cq) {
986         MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
987             .Setup(*context);
988         return client->AsyncSetIamPolicy(context, request, cq);
989       },
990       std::move(request));
991 }
992 
TestIamPermissions(std::string const & table_id,std::vector<std::string> const & permissions)993 StatusOr<std::vector<std::string>> TableAdmin::TestIamPermissions(
994     std::string const& table_id, std::vector<std::string> const& permissions) {
995   grpc::Status status;
996   ::google::iam::v1::TestIamPermissionsRequest request;
997   auto resource = TableName(table_id);
998   request.set_resource(resource);
999 
1000   // Copy the policies in effect for the operation.
1001   auto rpc_policy = clone_rpc_retry_policy();
1002   auto backoff_policy = clone_rpc_backoff_policy();
1003 
1004   for (auto const& permission : permissions) {
1005     request.add_permissions(permission);
1006   }
1007 
1008   MetadataUpdatePolicy metadata_update_policy(resource,
1009                                               MetadataParamTypes::RESOURCE);
1010 
1011   auto response = ClientUtils::MakeCall(
1012       *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
1013       &AdminClient::TestIamPermissions, request, "TestIamPermissions", status,
1014       Idempotency::kIdempotent);
1015 
1016   std::vector<std::string> resource_permissions;
1017 
1018   for (auto& permission : *response.mutable_permissions()) {
1019     resource_permissions.push_back(permission);
1020   }
1021 
1022   if (!status.ok()) {
1023     return MakeStatusFromRpcError(status);
1024   }
1025 
1026   return resource_permissions;
1027 }
1028 
AsyncTestIamPermissions(CompletionQueue & cq,std::string const & table_id,std::vector<std::string> const & permissions)1029 future<StatusOr<std::vector<std::string>>> TableAdmin::AsyncTestIamPermissions(
1030     CompletionQueue& cq, std::string const& table_id,
1031     std::vector<std::string> const& permissions) {
1032   ::google::iam::v1::TestIamPermissionsRequest request;
1033   auto resource = TableName(table_id);
1034   request.set_resource(resource);
1035   for (auto const& permission : permissions) {
1036     request.add_permissions(permission);
1037   }
1038 
1039   auto client = client_;
1040   return google::cloud::internal::StartRetryAsyncUnaryRpc(
1041              cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
1042              Idempotency::kIdempotent,
1043              [client, resource](
1044                  grpc::ClientContext* context,
1045                  ::google::iam::v1::TestIamPermissionsRequest const& request,
1046                  grpc::CompletionQueue* cq) {
1047                MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
1048                    .Setup(*context);
1049                return client->AsyncTestIamPermissions(context, request, cq);
1050              },
1051              std::move(request))
1052       .then([](future<StatusOr<::google::iam::v1::TestIamPermissionsResponse>>
1053                    response_fut) -> StatusOr<std::vector<std::string>> {
1054         auto response = response_fut.get();
1055         if (!response) {
1056           return response.status();
1057         }
1058         std::vector<std::string> res;
1059         res.reserve(response->permissions_size());
1060         std::move(response->mutable_permissions()->begin(),
1061                   response->mutable_permissions()->end(),
1062                   std::back_inserter(res));
1063         return res;
1064       });
1065 }
1066 
InstanceName() const1067 std::string TableAdmin::InstanceName() const {
1068   return "projects/" + client_->project() + "/instances/" + instance_id_;
1069 }
1070 
1071 }  // namespace BIGTABLE_CLIENT_NS
1072 }  // namespace bigtable
1073 }  // namespace cloud
1074 }  // namespace google
1075