1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigtable/table_admin.h"
16 #include "google/cloud/bigtable/internal/async_retry_multi_page.h"
17 #include "google/cloud/bigtable/internal/async_retry_op.h"
18 #include "google/cloud/bigtable/internal/async_retry_unary_rpc_and_poll.h"
19 #include "google/cloud/bigtable/internal/unary_client_utils.h"
20 #include "google/cloud/grpc_error_delegate.h"
21 #include "google/cloud/internal/async_retry_unary_rpc.h"
22 #include "google/cloud/internal/retry_policy.h"
23 #include "google/cloud/internal/time_utils.h"
24 #include <google/protobuf/duration.pb.h>
25 #include <sstream>
26
27 namespace btadmin = ::google::bigtable::admin::v2;
28
29 namespace google {
30 namespace cloud {
31 namespace bigtable {
32 inline namespace BIGTABLE_CLIENT_NS {
33 static_assert(std::is_copy_constructible<bigtable::TableAdmin>::value,
34 "bigtable::TableAdmin must be constructible");
35 static_assert(std::is_copy_assignable<bigtable::TableAdmin>::value,
36 "bigtable::TableAdmin must be assignable");
37
38 // NOLINTNEXTLINE(readability-identifier-naming)
39 constexpr TableAdmin::TableView TableAdmin::VIEW_UNSPECIFIED;
40 // NOLINTNEXTLINE(readability-identifier-naming)
41 constexpr TableAdmin::TableView TableAdmin::NAME_ONLY;
42 // NOLINTNEXTLINE(readability-identifier-naming)
43 constexpr TableAdmin::TableView TableAdmin::SCHEMA_VIEW;
44 // NOLINTNEXTLINE(readability-identifier-naming)
45 constexpr TableAdmin::TableView TableAdmin::REPLICATION_VIEW;
46 // NOLINTNEXTLINE(readability-identifier-naming)
47 constexpr TableAdmin::TableView TableAdmin::FULL;
48
49 /// Shortcuts to avoid typing long names over and over.
50 using ClientUtils = bigtable::internal::UnaryClientUtils<AdminClient>;
51 using google::cloud::internal::Idempotency;
52
CreateTable(std::string table_id,TableConfig config)53 StatusOr<btadmin::Table> TableAdmin::CreateTable(std::string table_id,
54 TableConfig config) {
55 grpc::Status status;
56
57 auto request = std::move(config).as_proto();
58 request.set_parent(instance_name());
59 request.set_table_id(std::move(table_id));
60
61 // This is a non-idempotent API, use the correct retry loop for this type of
62 // operation.
63 auto result = ClientUtils::MakeNonIdempotentCall(
64 *client_, clone_rpc_retry_policy(), clone_metadata_update_policy(),
65 &AdminClient::CreateTable, request, "CreateTable", status);
66
67 if (!status.ok()) {
68 return google::cloud::MakeStatusFromRpcError(status);
69 }
70 return result;
71 }
72
AsyncCreateTable(CompletionQueue & cq,std::string table_id,TableConfig config)73 future<StatusOr<btadmin::Table>> TableAdmin::AsyncCreateTable(
74 CompletionQueue& cq, std::string table_id, TableConfig config) {
75 btadmin::CreateTableRequest request = std::move(config).as_proto();
76 request.set_parent(instance_name());
77 request.set_table_id(std::move(table_id));
78
79 auto client = client_;
80 auto metadata_update_policy = clone_metadata_update_policy();
81 return google::cloud::internal::StartRetryAsyncUnaryRpc(
82 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
83 Idempotency::kNonIdempotent,
84 [client, metadata_update_policy](
85 grpc::ClientContext* context,
86 btadmin::CreateTableRequest const& request,
87 grpc::CompletionQueue* cq) {
88 metadata_update_policy.Setup(*context);
89 return client->AsyncCreateTable(context, request, cq);
90 },
91 std::move(request));
92 }
93
AsyncGetTable(CompletionQueue & cq,std::string const & table_id,btadmin::Table::View view)94 future<StatusOr<google::bigtable::admin::v2::Table>> TableAdmin::AsyncGetTable(
95 CompletionQueue& cq, std::string const& table_id,
96 btadmin::Table::View view) {
97 google::bigtable::admin::v2::GetTableRequest request{};
98 auto name = TableName(table_id);
99 request.set_name(name);
100 request.set_view(view);
101
102 // Copy the client because we lack C++14 extended lambda captures.
103 auto client = client_;
104 auto metadata_update_policy = clone_metadata_update_policy();
105 return google::cloud::internal::StartRetryAsyncUnaryRpc(
106 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
107 Idempotency::kIdempotent,
108 [client, metadata_update_policy](
109 grpc::ClientContext* context,
110 google::bigtable::admin::v2::GetTableRequest const& request,
111 grpc::CompletionQueue* cq) {
112 metadata_update_policy.Setup(*context);
113 return client->AsyncGetTable(context, request, cq);
114 },
115 std::move(request));
116 }
117
ListTables(btadmin::Table::View view)118 StatusOr<std::vector<btadmin::Table>> TableAdmin::ListTables(
119 btadmin::Table::View view) {
120 grpc::Status status;
121
122 // Copy the policies in effect for the operation.
123 auto rpc_policy = clone_rpc_retry_policy();
124 auto backoff_policy = clone_rpc_backoff_policy();
125
126 // Build the RPC request, try to minimize copying.
127 std::vector<btadmin::Table> result;
128 std::string page_token;
129 do {
130 btadmin::ListTablesRequest request;
131 request.set_page_token(std::move(page_token));
132 request.set_parent(instance_name());
133 request.set_view(view);
134
135 auto response = ClientUtils::MakeCall(
136 *client_, *rpc_policy, *backoff_policy, clone_metadata_update_policy(),
137 &AdminClient::ListTables, request, "TableAdmin", status,
138 Idempotency::kIdempotent);
139
140 if (!status.ok()) {
141 return google::cloud::MakeStatusFromRpcError(status);
142 }
143
144 for (auto& x : *response.mutable_tables()) {
145 result.emplace_back(std::move(x));
146 }
147 page_token = std::move(*response.mutable_next_page_token());
148 } while (!page_token.empty());
149 return result;
150 }
151
AsyncListTables(CompletionQueue & cq,btadmin::Table::View view)152 future<StatusOr<std::vector<btadmin::Table>>> TableAdmin::AsyncListTables(
153 CompletionQueue& cq, btadmin::Table::View view) {
154 auto client = client_;
155 btadmin::ListTablesRequest request;
156 request.set_parent(instance_name());
157 request.set_view(view);
158
159 return internal::StartAsyncRetryMultiPage(
160 __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
161 clone_metadata_update_policy(),
162 [client](grpc::ClientContext* context,
163 btadmin::ListTablesRequest const& request,
164 grpc::CompletionQueue* cq) {
165 return client->AsyncListTables(context, request, cq);
166 },
167 std::move(request), std::vector<btadmin::Table>(),
168 [](std::vector<btadmin::Table> acc,
169 btadmin::ListTablesResponse const& response) {
170 std::move(response.tables().begin(), response.tables().end(),
171 std::back_inserter(acc));
172 return acc;
173 },
174 cq);
175 }
176
GetTable(std::string const & table_id,btadmin::Table::View view)177 StatusOr<btadmin::Table> TableAdmin::GetTable(std::string const& table_id,
178 btadmin::Table::View view) {
179 grpc::Status status;
180 btadmin::GetTableRequest request;
181 auto name = TableName(table_id);
182 request.set_name(name);
183 request.set_view(view);
184
185 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
186
187 auto result = ClientUtils::MakeCall(
188 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
189 metadata_update_policy, &AdminClient::GetTable, request, "GetTable",
190 status, Idempotency::kIdempotent);
191 if (!status.ok()) {
192 return google::cloud::MakeStatusFromRpcError(status);
193 }
194
195 return result;
196 }
197
DeleteTable(std::string const & table_id)198 Status TableAdmin::DeleteTable(std::string const& table_id) {
199 grpc::Status status;
200 btadmin::DeleteTableRequest request;
201 auto name = TableName(table_id);
202 request.set_name(name);
203
204 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
205
206 // This is a non-idempotent API, use the correct retry loop for this type of
207 // operation.
208 ClientUtils::MakeNonIdempotentCall(
209 *client_, clone_rpc_retry_policy(), metadata_update_policy,
210 &AdminClient::DeleteTable, request, "DeleteTable", status);
211
212 return google::cloud::MakeStatusFromRpcError(status);
213 }
214
AsyncDeleteTable(CompletionQueue & cq,std::string const & table_id)215 future<Status> TableAdmin::AsyncDeleteTable(CompletionQueue& cq,
216 std::string const& table_id) {
217 grpc::Status status;
218 btadmin::DeleteTableRequest request;
219 auto name = TableName(table_id);
220 request.set_name(name);
221
222 auto client = client_;
223 return google::cloud::internal::StartRetryAsyncUnaryRpc(
224 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
225 Idempotency::kIdempotent,
226 [client, name](
227 grpc::ClientContext* context,
228 google::bigtable::admin::v2::DeleteTableRequest const& request,
229 grpc::CompletionQueue* cq) {
230 MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
231 .Setup(*context);
232 return client->AsyncDeleteTable(context, request, cq);
233 },
234 std::move(request))
235 .then([](future<StatusOr<google::protobuf::Empty>> r) {
236 return r.get().status();
237 });
238 }
239
240 google::bigtable::admin::v2::CreateBackupRequest
AsProto(std::string instance_name) const241 TableAdmin::CreateBackupParams::AsProto(std::string instance_name) const {
242 google::bigtable::admin::v2::CreateBackupRequest proto;
243 proto.set_parent(instance_name + "/clusters/" + cluster_id);
244 proto.set_backup_id(backup_id);
245 proto.mutable_backup()->set_source_table(std::move(instance_name) +
246 "/tables/" + table_name);
247 *proto.mutable_backup()->mutable_expire_time() =
248 google::cloud::internal::ToProtoTimestamp(expire_time);
249 return proto;
250 }
251
CreateBackup(CreateBackupParams const & params)252 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::CreateBackup(
253 CreateBackupParams const& params) {
254 CompletionQueue cq;
255 std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
256 return AsyncCreateBackup(cq, params)
257 .then(
258 [cq](
259 future<StatusOr<google::bigtable::admin::v2::Backup>> f) mutable {
260 cq.Shutdown();
261 return f.get();
262 })
263 .get();
264 }
265
266 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncCreateBackup(CompletionQueue & cq,CreateBackupParams const & params)267 TableAdmin::AsyncCreateBackup(CompletionQueue& cq,
268 CreateBackupParams const& params) {
269 auto request = params.AsProto(instance_name());
270 MetadataUpdatePolicy metadata_update_policy(request.parent(),
271 MetadataParamTypes::PARENT);
272 auto client = client_;
273 return internal::AsyncStartPollAfterRetryUnaryRpc<
274 google::bigtable::admin::v2::Backup>(
275 __func__, clone_polling_policy(), clone_rpc_retry_policy(),
276 clone_rpc_backoff_policy(),
277 internal::ConstantIdempotencyPolicy(Idempotency::kNonIdempotent),
278 metadata_update_policy, client,
279 [client](grpc::ClientContext* context,
280 google::bigtable::admin::v2::CreateBackupRequest const& request,
281 grpc::CompletionQueue* cq) {
282 return client->AsyncCreateBackup(context, request, cq);
283 },
284 std::move(request), cq);
285 }
286
GetBackup(std::string const & cluster_id,std::string const & backup_id)287 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::GetBackup(
288 std::string const& cluster_id, std::string const& backup_id) {
289 grpc::Status status;
290 btadmin::GetBackupRequest request;
291 std::string name =
292 instance_name() + "/clusters/" + cluster_id + "/backups/" + backup_id;
293 request.set_name(name);
294
295 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
296
297 auto result = ClientUtils::MakeCall(
298 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
299 metadata_update_policy, &AdminClient::GetBackup, request, "GetBackup",
300 status, Idempotency::kIdempotent);
301 if (!status.ok()) {
302 return google::cloud::MakeStatusFromRpcError(status);
303 }
304
305 return result;
306 }
307
308 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncGetBackup(CompletionQueue & cq,std::string const & cluster_id,std::string const & backup_id)309 TableAdmin::AsyncGetBackup(CompletionQueue& cq, std::string const& cluster_id,
310 std::string const& backup_id) {
311 google::bigtable::admin::v2::GetBackupRequest request{};
312 std::string name =
313 instance_name() + "/clusters/" + cluster_id + "/backups/" + backup_id;
314 request.set_name(name);
315
316 // Copy the client because we lack C++14 extended lambda captures.
317 auto client = client_;
318 auto metadata_update_policy = clone_metadata_update_policy();
319 return google::cloud::internal::StartRetryAsyncUnaryRpc(
320 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
321 Idempotency::kIdempotent,
322 [client, metadata_update_policy](
323 grpc::ClientContext* context,
324 google::bigtable::admin::v2::GetBackupRequest const& request,
325 grpc::CompletionQueue* cq) {
326 metadata_update_policy.Setup(*context);
327 return client->AsyncGetBackup(context, request, cq);
328 },
329 std::move(request));
330 }
331
332 google::bigtable::admin::v2::UpdateBackupRequest
AsProto(std::string const & instance_name) const333 TableAdmin::UpdateBackupParams::AsProto(
334 std::string const& instance_name) const {
335 google::bigtable::admin::v2::UpdateBackupRequest proto;
336 proto.mutable_backup()->set_name(instance_name + "/clusters/" + cluster_id +
337 "/backups/" + backup_name);
338 *proto.mutable_backup()->mutable_expire_time() =
339 google::cloud::internal::ToProtoTimestamp(expire_time);
340 proto.mutable_update_mask()->add_paths("expire_time");
341 return proto;
342 }
343
UpdateBackup(UpdateBackupParams const & params)344 StatusOr<google::bigtable::admin::v2::Backup> TableAdmin::UpdateBackup(
345 UpdateBackupParams const& params) {
346 grpc::Status status;
347 btadmin::UpdateBackupRequest request = params.AsProto(instance_name());
348
349 MetadataUpdatePolicy metadata_update_policy(request.backup().name(),
350 MetadataParamTypes::BACKUP_NAME);
351
352 auto result = ClientUtils::MakeCall(
353 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
354 metadata_update_policy, &AdminClient::UpdateBackup, request,
355 "UpdateBackup", status, Idempotency::kIdempotent);
356 if (!status.ok()) {
357 return google::cloud::MakeStatusFromRpcError(status);
358 }
359
360 return result;
361 }
362
363 future<StatusOr<google::bigtable::admin::v2::Backup>>
AsyncUpdateBackup(CompletionQueue & cq,UpdateBackupParams const & params)364 TableAdmin::AsyncUpdateBackup(CompletionQueue& cq,
365 UpdateBackupParams const& params) {
366 btadmin::UpdateBackupRequest request = params.AsProto(instance_name());
367
368 // Copy the client because we lack C++14 extended lambda captures.
369 auto client = client_;
370 auto metadata_update_policy = clone_metadata_update_policy();
371 return google::cloud::internal::StartRetryAsyncUnaryRpc(
372 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
373 Idempotency::kIdempotent,
374 [client, metadata_update_policy](
375 grpc::ClientContext* context,
376 google::bigtable::admin::v2::UpdateBackupRequest const& request,
377 grpc::CompletionQueue* cq) {
378 metadata_update_policy.Setup(*context);
379 return client->AsyncUpdateBackup(context, request, cq);
380 },
381 std::move(request));
382 }
383
DeleteBackup(google::bigtable::admin::v2::Backup const & backup)384 Status TableAdmin::DeleteBackup(
385 google::bigtable::admin::v2::Backup const& backup) {
386 grpc::Status status;
387 btadmin::DeleteBackupRequest request;
388 request.set_name(backup.name());
389 MetadataUpdatePolicy metadata_update_policy(request.name(),
390 MetadataParamTypes::NAME);
391 auto result = ClientUtils::MakeCall(
392 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
393 metadata_update_policy, &AdminClient::DeleteBackup, request,
394 "DeleteBackup", status, Idempotency::kIdempotent);
395 if (!status.ok()) {
396 return google::cloud::MakeStatusFromRpcError(status);
397 }
398
399 return {};
400 }
401
AsyncDeleteBackup(CompletionQueue & cq,google::bigtable::admin::v2::Backup const & backup)402 future<Status> TableAdmin::AsyncDeleteBackup(
403 CompletionQueue& cq, google::bigtable::admin::v2::Backup const& backup) {
404 grpc::Status status;
405 btadmin::DeleteBackupRequest request;
406 request.set_name(backup.name());
407 auto client = client_;
408 auto metadata_update_policy = clone_metadata_update_policy();
409 return google::cloud::internal::StartRetryAsyncUnaryRpc(
410 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
411 Idempotency::kIdempotent,
412 [client, metadata_update_policy](
413 grpc::ClientContext* context,
414 google::bigtable::admin::v2::DeleteBackupRequest const&
415 request,
416 grpc::CompletionQueue* cq) {
417 metadata_update_policy.Setup(*context);
418
419 return client->AsyncDeleteBackup(context, request, cq);
420 },
421 std::move(request))
422 .then([](future<StatusOr<google::protobuf::Empty>> r) {
423 return r.get().status();
424 });
425 }
426
DeleteBackup(std::string const & cluster_id,std::string const & backup_id)427 Status TableAdmin::DeleteBackup(std::string const& cluster_id,
428 std::string const& backup_id) {
429 grpc::Status status;
430 btadmin::DeleteBackupRequest request;
431 request.set_name(instance_name() + "/clusters/" + cluster_id + "/backups/" +
432 backup_id);
433
434 MetadataUpdatePolicy metadata_update_policy(request.name(),
435 MetadataParamTypes::NAME);
436
437 auto result = ClientUtils::MakeCall(
438 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
439 metadata_update_policy, &AdminClient::DeleteBackup, request,
440 "DeleteBackup", status, Idempotency::kIdempotent);
441 if (!status.ok()) {
442 return google::cloud::MakeStatusFromRpcError(status);
443 }
444
445 return {};
446 }
447
AsyncDeleteBackup(CompletionQueue & cq,std::string const & cluster_id,std::string const & backup_id)448 future<Status> TableAdmin::AsyncDeleteBackup(CompletionQueue& cq,
449 std::string const& cluster_id,
450 std::string const& backup_id) {
451 grpc::Status status;
452 btadmin::DeleteBackupRequest request;
453 request.set_name(instance_name() + "/clusters/" + cluster_id + "/backups/" +
454 backup_id);
455 auto client = client_;
456 auto metadata_update_policy = clone_metadata_update_policy();
457 return google::cloud::internal::StartRetryAsyncUnaryRpc(
458 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
459 Idempotency::kIdempotent,
460 [client, metadata_update_policy](
461 grpc::ClientContext* context,
462 google::bigtable::admin::v2::DeleteBackupRequest const&
463 request,
464 grpc::CompletionQueue* cq) {
465 metadata_update_policy.Setup(*context);
466
467 return client->AsyncDeleteBackup(context, request, cq);
468 },
469 std::move(request))
470 .then([](future<StatusOr<google::protobuf::Empty>> r) {
471 return r.get().status();
472 });
473 }
474
475 google::bigtable::admin::v2::ListBackupsRequest
AsProto(std::string const & instance_name) const476 TableAdmin::ListBackupsParams::AsProto(std::string const& instance_name) const {
477 google::bigtable::admin::v2::ListBackupsRequest proto;
478 proto.set_parent(cluster_id ? instance_name + "/clusters/" + *cluster_id
479 : instance_name + "/clusters/-");
480 if (filter) *proto.mutable_filter() = *filter;
481 if (order_by) *proto.mutable_order_by() = *order_by;
482 return proto;
483 }
484
485 StatusOr<std::vector<google::bigtable::admin::v2::Backup>>
ListBackups(ListBackupsParams const & params)486 TableAdmin::ListBackups(ListBackupsParams const& params) {
487 grpc::Status status;
488
489 // Copy the policies in effect for the operation.
490 auto rpc_policy = clone_rpc_retry_policy();
491 auto backoff_policy = clone_rpc_backoff_policy();
492
493 // Build the RPC request, try to minimize copying.
494 std::vector<btadmin::Backup> result;
495 btadmin::ListBackupsRequest request = params.AsProto(instance_name());
496
497 MetadataUpdatePolicy metadata_update_policy(request.parent(),
498 MetadataParamTypes::PARENT);
499
500 std::string page_token;
501 do {
502 request.set_page_token(std::move(page_token));
503
504 auto response = ClientUtils::MakeCall(
505 *client_, *rpc_policy, *backoff_policy, metadata_update_policy,
506 &AdminClient::ListBackups, request, "TableAdmin", status,
507 Idempotency::kIdempotent);
508
509 if (!status.ok()) {
510 return google::cloud::MakeStatusFromRpcError(status);
511 }
512
513 for (auto& x : *response.mutable_backups()) {
514 result.emplace_back(std::move(x));
515 }
516 page_token = std::move(*response.mutable_next_page_token());
517 } while (!page_token.empty());
518 return result;
519 }
520
521 future<StatusOr<std::vector<google::bigtable::admin::v2::Backup>>>
AsyncListBackups(CompletionQueue & cq,ListBackupsParams const & params)522 TableAdmin::AsyncListBackups(CompletionQueue& cq,
523 ListBackupsParams const& params) {
524 auto client = client_;
525 btadmin::ListBackupsRequest request = params.AsProto(instance_name());
526 return internal::StartAsyncRetryMultiPage(
527 __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
528 clone_metadata_update_policy(),
529 [client](grpc::ClientContext* context,
530 btadmin::ListBackupsRequest const& request,
531 grpc::CompletionQueue* cq) {
532 return client->AsyncListBackups(context, request, cq);
533 },
534 std::move(request), std::vector<btadmin::Backup>(),
535 [](std::vector<btadmin::Backup> acc,
536 btadmin::ListBackupsResponse const& response) {
537 std::move(response.backups().begin(), response.backups().end(),
538 std::back_inserter(acc));
539 return acc;
540 },
541 cq);
542 }
543
544 google::bigtable::admin::v2::RestoreTableRequest
AsProto(std::string const & instance_name) const545 TableAdmin::RestoreTableParams::AsProto(
546 std::string const& instance_name) const {
547 google::bigtable::admin::v2::RestoreTableRequest proto;
548 proto.set_parent(instance_name);
549 proto.set_table_id(table_id);
550 proto.set_backup(instance_name + "/clusters/" + cluster_id + "/backups/" +
551 backup_id);
552 return proto;
553 }
554
RestoreTable(RestoreTableParams const & params)555 StatusOr<google::bigtable::admin::v2::Table> TableAdmin::RestoreTable(
556 RestoreTableParams const& params) {
557 CompletionQueue cq;
558 std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
559 return AsyncRestoreTable(cq, params)
560 .then(
561 [cq](future<StatusOr<google::bigtable::admin::v2::Table>> f) mutable {
562 cq.Shutdown();
563 return f.get();
564 })
565 .get();
566 }
567
568 future<StatusOr<google::bigtable::admin::v2::Table>>
AsyncRestoreTable(CompletionQueue & cq,RestoreTableParams const & params)569 TableAdmin::AsyncRestoreTable(CompletionQueue& cq,
570 RestoreTableParams const& params) {
571 MetadataUpdatePolicy metadata_update_policy(instance_name(),
572 MetadataParamTypes::PARENT);
573 auto client = client_;
574 return internal::AsyncStartPollAfterRetryUnaryRpc<
575 google::bigtable::admin::v2::Table>(
576 __func__, clone_polling_policy(), clone_rpc_retry_policy(),
577 clone_rpc_backoff_policy(),
578 internal::ConstantIdempotencyPolicy(Idempotency::kNonIdempotent),
579 metadata_update_policy, client,
580 [client](grpc::ClientContext* context,
581 google::bigtable::admin::v2::RestoreTableRequest const& request,
582 grpc::CompletionQueue* cq) {
583 return client->AsyncRestoreTable(context, request, cq);
584 },
585 params.AsProto(instance_name()), cq);
586 }
587
ModifyColumnFamilies(std::string const & table_id,std::vector<ColumnFamilyModification> modifications)588 StatusOr<btadmin::Table> TableAdmin::ModifyColumnFamilies(
589 std::string const& table_id,
590 std::vector<ColumnFamilyModification> modifications) {
591 grpc::Status status;
592
593 btadmin::ModifyColumnFamiliesRequest request;
594 auto name = TableName(table_id);
595 request.set_name(name);
596 for (auto& m : modifications) {
597 *request.add_modifications() = std::move(m).as_proto();
598 }
599 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
600 auto result = ClientUtils::MakeNonIdempotentCall(
601 *client_, clone_rpc_retry_policy(), metadata_update_policy,
602 &AdminClient::ModifyColumnFamilies, request, "ModifyColumnFamilies",
603 status);
604
605 if (!status.ok()) {
606 return google::cloud::MakeStatusFromRpcError(status);
607 }
608 return result;
609 }
610
AsyncModifyColumnFamilies(CompletionQueue & cq,std::string const & table_id,std::vector<ColumnFamilyModification> modifications)611 future<StatusOr<btadmin::Table>> TableAdmin::AsyncModifyColumnFamilies(
612 CompletionQueue& cq, std::string const& table_id,
613 std::vector<ColumnFamilyModification> modifications) {
614 btadmin::ModifyColumnFamiliesRequest request;
615 auto name = TableName(table_id);
616 request.set_name(name);
617 for (auto& m : modifications) {
618 *request.add_modifications() = std::move(m).as_proto();
619 }
620
621 auto client = client_;
622 return google::cloud::internal::StartRetryAsyncUnaryRpc(
623 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
624 Idempotency::kIdempotent,
625 [client, name](grpc::ClientContext* context,
626 btadmin::ModifyColumnFamiliesRequest const& request,
627 grpc::CompletionQueue* cq) {
628 MetadataUpdatePolicy(name, MetadataParamTypes::NAME).Setup(*context);
629 return client->AsyncModifyColumnFamilies(context, request, cq);
630 },
631 std::move(request));
632 }
633
DropRowsByPrefix(std::string const & table_id,std::string row_key_prefix)634 Status TableAdmin::DropRowsByPrefix(std::string const& table_id,
635 std::string row_key_prefix) {
636 grpc::Status status;
637 btadmin::DropRowRangeRequest request;
638 auto name = TableName(table_id);
639 request.set_name(name);
640 request.set_row_key_prefix(std::move(row_key_prefix));
641 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
642 ClientUtils::MakeNonIdempotentCall(
643 *client_, clone_rpc_retry_policy(), metadata_update_policy,
644 &AdminClient::DropRowRange, request, "DropRowByPrefix", status);
645
646 return google::cloud::MakeStatusFromRpcError(status);
647 }
648
AsyncDropRowsByPrefix(CompletionQueue & cq,std::string const & table_id,std::string row_key_prefix)649 future<Status> TableAdmin::AsyncDropRowsByPrefix(CompletionQueue& cq,
650 std::string const& table_id,
651 std::string row_key_prefix) {
652 google::bigtable::admin::v2::DropRowRangeRequest request;
653 auto name = TableName(table_id);
654 request.set_name(name);
655 request.set_row_key_prefix(std::move(row_key_prefix));
656
657 auto client = client_;
658 return google::cloud::internal::StartRetryAsyncUnaryRpc(
659 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
660 Idempotency::kIdempotent,
661 [client, name](grpc::ClientContext* context,
662 btadmin::DropRowRangeRequest const& request,
663 grpc::CompletionQueue* cq) {
664 MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
665 .Setup(*context);
666 return client->AsyncDropRowRange(context, request, cq);
667 },
668 std::move(request))
669 .then([](future<StatusOr<google::protobuf::Empty>> r) {
670 return r.get().status();
671 });
672 }
673
WaitForConsistency(std::string const & table_id,std::string const & consistency_token)674 google::cloud::future<StatusOr<Consistency>> TableAdmin::WaitForConsistency(
675 std::string const& table_id, std::string const& consistency_token) {
676 CompletionQueue cq;
677 std::thread([](CompletionQueue cq) { cq.Run(); }, cq).detach();
678
679 return AsyncWaitForConsistency(cq, table_id, consistency_token)
680 .then([cq](future<StatusOr<Consistency>> f) mutable {
681 cq.Shutdown();
682 return f.get();
683 });
684 }
685
686 google::cloud::future<StatusOr<Consistency>>
AsyncWaitForConsistency(CompletionQueue & cq,std::string const & table_id,std::string const & consistency_token)687 TableAdmin::AsyncWaitForConsistency(CompletionQueue& cq,
688 std::string const& table_id,
689 std::string const& consistency_token) {
690 class AsyncWaitForConsistencyState
691 : public std::enable_shared_from_this<AsyncWaitForConsistencyState> {
692 public:
693 static future<StatusOr<Consistency>> Create(
694 CompletionQueue cq, std::string table_id, std::string consistency_token,
695 TableAdmin const& table_admin,
696 std::unique_ptr<PollingPolicy> polling_policy) {
697 std::shared_ptr<AsyncWaitForConsistencyState> state(
698 new AsyncWaitForConsistencyState(
699 std::move(cq), std::move(table_id), std::move(consistency_token),
700 table_admin, std::move(polling_policy)));
701
702 state->StartIteration();
703 return state->promise_.get_future();
704 }
705
706 private:
707 AsyncWaitForConsistencyState(CompletionQueue cq, std::string table_id,
708 std::string consistency_token,
709 TableAdmin const& table_admin,
710 std::unique_ptr<PollingPolicy> polling_policy)
711 : cq_(std::move(cq)),
712 table_id_(std::move(table_id)),
713 consistency_token_(std::move(consistency_token)),
714 table_admin_(table_admin),
715 polling_policy_(std::move(polling_policy)) {}
716
717 void StartIteration() {
718 auto self = shared_from_this();
719 table_admin_.AsyncCheckConsistency(cq_, table_id_, consistency_token_)
720 .then([self](future<StatusOr<Consistency>> f) {
721 self->OnCheckConsistency(f.get());
722 });
723 }
724
725 void OnCheckConsistency(StatusOr<Consistency> consistent) {
726 auto self = shared_from_this();
727 if (consistent && *consistent == Consistency::kConsistent) {
728 promise_.set_value(*consistent);
729 return;
730 }
731 auto status = std::move(consistent).status();
732 if (!polling_policy_->OnFailure(status)) {
733 promise_.set_value(std::move(status));
734 return;
735 }
736 cq_.MakeRelativeTimer(polling_policy_->WaitPeriod())
737 .then([self](future<StatusOr<std::chrono::system_clock::time_point>>
738 result) {
739 if (auto tp = result.get()) {
740 self->StartIteration();
741 } else {
742 self->promise_.set_value(tp.status());
743 }
744 });
745 }
746
747 CompletionQueue cq_;
748 std::string table_id_;
749 std::string consistency_token_;
750 TableAdmin table_admin_;
751 std::unique_ptr<PollingPolicy> polling_policy_;
752 google::cloud::promise<StatusOr<Consistency>> promise_;
753 };
754
755 return AsyncWaitForConsistencyState::Create(cq, table_id, consistency_token,
756 *this, clone_polling_policy());
757 }
758
DropAllRows(std::string const & table_id)759 Status TableAdmin::DropAllRows(std::string const& table_id) {
760 grpc::Status status;
761 btadmin::DropRowRangeRequest request;
762 auto name = TableName(table_id);
763 request.set_name(name);
764 request.set_delete_all_data_from_table(true);
765 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
766 ClientUtils::MakeNonIdempotentCall(
767 *client_, clone_rpc_retry_policy(), metadata_update_policy,
768 &AdminClient::DropRowRange, request, "DropAllRows", status);
769
770 return google::cloud::MakeStatusFromRpcError(status);
771 }
772
AsyncDropAllRows(CompletionQueue & cq,std::string const & table_id)773 future<Status> TableAdmin::AsyncDropAllRows(CompletionQueue& cq,
774 std::string const& table_id) {
775 google::bigtable::admin::v2::DropRowRangeRequest request;
776 auto name = TableName(table_id);
777 request.set_name(name);
778 request.set_delete_all_data_from_table(true);
779
780 auto client = client_;
781 return google::cloud::internal::StartRetryAsyncUnaryRpc(
782 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
783 Idempotency::kIdempotent,
784 [client, name](grpc::ClientContext* context,
785 btadmin::DropRowRangeRequest const& request,
786 grpc::CompletionQueue* cq) {
787 MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
788 .Setup(*context);
789 return client->AsyncDropRowRange(context, request, cq);
790 },
791 std::move(request))
792 .then([](future<StatusOr<google::protobuf::Empty>> r) {
793 return r.get().status();
794 });
795 }
796
GenerateConsistencyToken(std::string const & table_id)797 StatusOr<std::string> TableAdmin::GenerateConsistencyToken(
798 std::string const& table_id) {
799 grpc::Status status;
800 btadmin::GenerateConsistencyTokenRequest request;
801 auto name = TableName(table_id);
802 request.set_name(name);
803 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
804
805 auto response = ClientUtils::MakeCall(
806 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
807 metadata_update_policy, &AdminClient::GenerateConsistencyToken, request,
808 "GenerateConsistencyToken", status, Idempotency::kIdempotent);
809
810 if (!status.ok()) {
811 return google::cloud::MakeStatusFromRpcError(status);
812 }
813 return std::move(*response.mutable_consistency_token());
814 }
815
AsyncGenerateConsistencyToken(CompletionQueue & cq,std::string const & table_id)816 future<StatusOr<std::string>> TableAdmin::AsyncGenerateConsistencyToken(
817 CompletionQueue& cq, std::string const& table_id) {
818 btadmin::GenerateConsistencyTokenRequest request;
819 auto name = TableName(table_id);
820 request.set_name(name);
821
822 auto client = client_;
823 return google::cloud::internal::StartRetryAsyncUnaryRpc(
824 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
825 Idempotency::kIdempotent,
826 [client, name](
827 grpc::ClientContext* context,
828 btadmin::GenerateConsistencyTokenRequest const& request,
829 grpc::CompletionQueue* cq) {
830 MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
831 .Setup(*context);
832 return client->AsyncGenerateConsistencyToken(context, request,
833 cq);
834 },
835 std::move(request))
836 .then([](future<StatusOr<btadmin::GenerateConsistencyTokenResponse>> fut)
837 -> StatusOr<std::string> {
838 auto result = fut.get();
839 if (!result) {
840 return result.status();
841 }
842 return std::move(*result->mutable_consistency_token());
843 });
844 }
845
CheckConsistency(std::string const & table_id,std::string const & consistency_token)846 StatusOr<Consistency> TableAdmin::CheckConsistency(
847 std::string const& table_id, std::string const& consistency_token) {
848 grpc::Status status;
849 btadmin::CheckConsistencyRequest request;
850 auto name = TableName(table_id);
851 request.set_name(name);
852 request.set_consistency_token(consistency_token);
853 MetadataUpdatePolicy metadata_update_policy(name, MetadataParamTypes::NAME);
854
855 auto response = ClientUtils::MakeCall(
856 *client_, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
857 metadata_update_policy, &AdminClient::CheckConsistency, request,
858 "CheckConsistency", status, Idempotency::kIdempotent);
859
860 if (!status.ok()) {
861 return google::cloud::MakeStatusFromRpcError(status);
862 }
863
864 return response.consistent() ? Consistency::kConsistent
865 : Consistency::kInconsistent;
866 }
867
AsyncCheckConsistency(CompletionQueue & cq,std::string const & table_id,std::string const & consistency_token)868 future<StatusOr<Consistency>> TableAdmin::AsyncCheckConsistency(
869 CompletionQueue& cq, std::string const& table_id,
870 std::string const& consistency_token) {
871 btadmin::CheckConsistencyRequest request;
872 auto name = TableName(table_id);
873 request.set_name(name);
874 request.set_consistency_token(consistency_token);
875
876 auto client = client_;
877 return google::cloud::internal::StartRetryAsyncUnaryRpc(
878 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
879 Idempotency::kIdempotent,
880 [client, name](grpc::ClientContext* context,
881 btadmin::CheckConsistencyRequest const& request,
882 grpc::CompletionQueue* cq) {
883 MetadataUpdatePolicy(name, MetadataParamTypes::NAME)
884 .Setup(*context);
885 return client->AsyncCheckConsistency(context, request, cq);
886 },
887 std::move(request))
888 .then([](future<StatusOr<btadmin::CheckConsistencyResponse>> fut)
889 -> StatusOr<Consistency> {
890 auto result = fut.get();
891 if (!result) {
892 return result.status();
893 }
894
895 return result->consistent() ? Consistency::kConsistent
896 : Consistency::kInconsistent;
897 });
898 }
899
GetIamPolicy(std::string const & table_id)900 StatusOr<google::iam::v1::Policy> TableAdmin::GetIamPolicy(
901 std::string const& table_id) {
902 grpc::Status status;
903 auto rpc_policy = clone_rpc_retry_policy();
904 auto backoff_policy = clone_rpc_backoff_policy();
905
906 ::google::iam::v1::GetIamPolicyRequest request;
907 auto resource = TableName(table_id);
908 request.set_resource(resource);
909
910 MetadataUpdatePolicy metadata_update_policy(resource,
911 MetadataParamTypes::RESOURCE);
912
913 auto proto = ClientUtils::MakeCall(
914 *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
915 &AdminClient::GetIamPolicy, request, "GetIamPolicy", status,
916 Idempotency::kIdempotent);
917
918 if (!status.ok()) {
919 return MakeStatusFromRpcError(status);
920 }
921
922 return proto;
923 }
924
AsyncGetIamPolicy(CompletionQueue & cq,std::string const & table_id)925 future<StatusOr<google::iam::v1::Policy>> TableAdmin::AsyncGetIamPolicy(
926 CompletionQueue& cq, std::string const& table_id) {
927 ::google::iam::v1::GetIamPolicyRequest request;
928 auto resource = TableName(table_id);
929 request.set_resource(resource);
930
931 auto client = client_;
932 return google::cloud::internal::StartRetryAsyncUnaryRpc(
933 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
934 Idempotency::kIdempotent,
935 [client, resource](grpc::ClientContext* context,
936 ::google::iam::v1::GetIamPolicyRequest const& request,
937 grpc::CompletionQueue* cq) {
938 MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
939 .Setup(*context);
940 return client->AsyncGetIamPolicy(context, request, cq);
941 },
942 std::move(request));
943 }
944
SetIamPolicy(std::string const & table_id,google::iam::v1::Policy const & iam_policy)945 StatusOr<google::iam::v1::Policy> TableAdmin::SetIamPolicy(
946 std::string const& table_id, google::iam::v1::Policy const& iam_policy) {
947 grpc::Status status;
948 auto rpc_policy = clone_rpc_retry_policy();
949 auto backoff_policy = clone_rpc_backoff_policy();
950
951 ::google::iam::v1::SetIamPolicyRequest request;
952 auto resource = TableName(table_id);
953 request.set_resource(resource);
954 *request.mutable_policy() = iam_policy;
955
956 MetadataUpdatePolicy metadata_update_policy(resource,
957 MetadataParamTypes::RESOURCE);
958
959 auto proto = ClientUtils::MakeCall(
960 *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
961 &AdminClient::SetIamPolicy, request, "SetIamPolicy", status,
962 Idempotency::kIdempotent);
963
964 if (!status.ok()) {
965 return MakeStatusFromRpcError(status);
966 }
967
968 return proto;
969 }
970
AsyncSetIamPolicy(CompletionQueue & cq,std::string const & table_id,google::iam::v1::Policy const & iam_policy)971 future<StatusOr<google::iam::v1::Policy>> TableAdmin::AsyncSetIamPolicy(
972 CompletionQueue& cq, std::string const& table_id,
973 google::iam::v1::Policy const& iam_policy) {
974 ::google::iam::v1::SetIamPolicyRequest request;
975 auto resource = TableName(table_id);
976 request.set_resource(resource);
977 *request.mutable_policy() = iam_policy;
978
979 auto client = client_;
980 return google::cloud::internal::StartRetryAsyncUnaryRpc(
981 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
982 Idempotency::kIdempotent,
983 [client, resource](grpc::ClientContext* context,
984 ::google::iam::v1::SetIamPolicyRequest const& request,
985 grpc::CompletionQueue* cq) {
986 MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
987 .Setup(*context);
988 return client->AsyncSetIamPolicy(context, request, cq);
989 },
990 std::move(request));
991 }
992
TestIamPermissions(std::string const & table_id,std::vector<std::string> const & permissions)993 StatusOr<std::vector<std::string>> TableAdmin::TestIamPermissions(
994 std::string const& table_id, std::vector<std::string> const& permissions) {
995 grpc::Status status;
996 ::google::iam::v1::TestIamPermissionsRequest request;
997 auto resource = TableName(table_id);
998 request.set_resource(resource);
999
1000 // Copy the policies in effect for the operation.
1001 auto rpc_policy = clone_rpc_retry_policy();
1002 auto backoff_policy = clone_rpc_backoff_policy();
1003
1004 for (auto const& permission : permissions) {
1005 request.add_permissions(permission);
1006 }
1007
1008 MetadataUpdatePolicy metadata_update_policy(resource,
1009 MetadataParamTypes::RESOURCE);
1010
1011 auto response = ClientUtils::MakeCall(
1012 *(client_), *rpc_policy, *backoff_policy, metadata_update_policy,
1013 &AdminClient::TestIamPermissions, request, "TestIamPermissions", status,
1014 Idempotency::kIdempotent);
1015
1016 std::vector<std::string> resource_permissions;
1017
1018 for (auto& permission : *response.mutable_permissions()) {
1019 resource_permissions.push_back(permission);
1020 }
1021
1022 if (!status.ok()) {
1023 return MakeStatusFromRpcError(status);
1024 }
1025
1026 return resource_permissions;
1027 }
1028
AsyncTestIamPermissions(CompletionQueue & cq,std::string const & table_id,std::vector<std::string> const & permissions)1029 future<StatusOr<std::vector<std::string>>> TableAdmin::AsyncTestIamPermissions(
1030 CompletionQueue& cq, std::string const& table_id,
1031 std::vector<std::string> const& permissions) {
1032 ::google::iam::v1::TestIamPermissionsRequest request;
1033 auto resource = TableName(table_id);
1034 request.set_resource(resource);
1035 for (auto const& permission : permissions) {
1036 request.add_permissions(permission);
1037 }
1038
1039 auto client = client_;
1040 return google::cloud::internal::StartRetryAsyncUnaryRpc(
1041 cq, __func__, clone_rpc_retry_policy(), clone_rpc_backoff_policy(),
1042 Idempotency::kIdempotent,
1043 [client, resource](
1044 grpc::ClientContext* context,
1045 ::google::iam::v1::TestIamPermissionsRequest const& request,
1046 grpc::CompletionQueue* cq) {
1047 MetadataUpdatePolicy(resource, MetadataParamTypes::RESOURCE)
1048 .Setup(*context);
1049 return client->AsyncTestIamPermissions(context, request, cq);
1050 },
1051 std::move(request))
1052 .then([](future<StatusOr<::google::iam::v1::TestIamPermissionsResponse>>
1053 response_fut) -> StatusOr<std::vector<std::string>> {
1054 auto response = response_fut.get();
1055 if (!response) {
1056 return response.status();
1057 }
1058 std::vector<std::string> res;
1059 res.reserve(response->permissions_size());
1060 std::move(response->mutable_permissions()->begin(),
1061 response->mutable_permissions()->end(),
1062 std::back_inserter(res));
1063 return res;
1064 });
1065 }
1066
InstanceName() const1067 std::string TableAdmin::InstanceName() const {
1068 return "projects/" + client_->project() + "/instances/" + instance_id_;
1069 }
1070
1071 } // namespace BIGTABLE_CLIENT_NS
1072 } // namespace bigtable
1073 } // namespace cloud
1074 } // namespace google
1075