1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/internal/async_bulk_apply.h"
16 #include "absl/memory/memory.h"
17 
18 namespace google {
19 namespace cloud {
20 namespace bigtable {
21 inline namespace BIGTABLE_CLIENT_NS {
22 namespace internal {
23 
Create(CompletionQueue cq,std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,IdempotentMutationPolicy & idempotent_policy,MetadataUpdatePolicy metadata_update_policy,std::shared_ptr<bigtable::DataClient> client,std::string const & app_profile_id,std::string const & table_name,BulkMutation mut)24 future<std::vector<FailedMutation>> AsyncRetryBulkApply::Create(
25     CompletionQueue cq, std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
26     std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
27     IdempotentMutationPolicy& idempotent_policy,
28     MetadataUpdatePolicy metadata_update_policy,
29     std::shared_ptr<bigtable::DataClient> client,
30     std::string const& app_profile_id, std::string const& table_name,
31     BulkMutation mut) {
32   std::shared_ptr<AsyncRetryBulkApply> bulk_apply(new AsyncRetryBulkApply(
33       std::move(rpc_retry_policy), std::move(rpc_backoff_policy),
34       idempotent_policy, std::move(metadata_update_policy), std::move(client),
35       app_profile_id, table_name, std::move(mut)));
36   bulk_apply->StartIterationIfNeeded(std::move(cq));
37   return bulk_apply->promise_.get_future();
38 }
39 
AsyncRetryBulkApply(std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,IdempotentMutationPolicy & idempotent_policy,MetadataUpdatePolicy metadata_update_policy,std::shared_ptr<bigtable::DataClient> client,std::string const & app_profile_id,std::string const & table_name,BulkMutation mut)40 AsyncRetryBulkApply::AsyncRetryBulkApply(
41     std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
42     std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
43     IdempotentMutationPolicy& idempotent_policy,
44     MetadataUpdatePolicy metadata_update_policy,
45     std::shared_ptr<bigtable::DataClient> client,
46     std::string const& app_profile_id, std::string const& table_name,
47     BulkMutation mut)
48     : rpc_retry_policy_(std::move(rpc_retry_policy)),
49       rpc_backoff_policy_(std::move(rpc_backoff_policy)),
50       metadata_update_policy_(std::move(metadata_update_policy)),
51       client_(std::move(client)),
52       state_(app_profile_id, table_name, idempotent_policy, std::move(mut)) {}
53 
StartIterationIfNeeded(CompletionQueue cq)54 void AsyncRetryBulkApply::StartIterationIfNeeded(CompletionQueue cq) {
55   if (!state_.HasPendingMutations()) {
56     // There is nothing to do, so just satisfy the future and return. Note that
57     // in the case of the retry policy begin expired we hit this point because
58     // the mutations are no longer "pending", they are all resolved with a
59     // error status.
60     promise_.set_value(std::move(state_).OnRetryDone());
61     return;
62   }
63 
64   auto context = absl::make_unique<grpc::ClientContext>();
65   rpc_retry_policy_->Setup(*context);
66   rpc_backoff_policy_->Setup(*context);
67   metadata_update_policy_.Setup(*context);
68   auto client = client_;
69   auto self = shared_from_this();
70   cq.MakeStreamingReadRpc(
71       [client](grpc::ClientContext* context,
72                google::bigtable::v2::MutateRowsRequest const& request,
73                grpc::CompletionQueue* cq) {
74         return client->PrepareAsyncMutateRows(context, request, cq);
75       },
76       state_.BeforeStart(), std::move(context),
77       [self, cq](google::bigtable::v2::MutateRowsResponse r) {
78         self->OnRead(std::move(r));
79         return make_ready_future(true);
80       },
81       [self, cq](Status s) { self->OnFinish(cq, std::move(s)); });
82 }
83 
OnRead(google::bigtable::v2::MutateRowsResponse response)84 void AsyncRetryBulkApply::OnRead(
85     google::bigtable::v2::MutateRowsResponse response) {
86   state_.OnRead(response);
87 }
88 
OnFinish(CompletionQueue cq,Status status)89 void AsyncRetryBulkApply::OnFinish(CompletionQueue cq, Status status) {
90   state_.OnFinish(std::move(status));
91   StartIterationIfNeeded(std::move(cq));
92 }
93 
94 }  // namespace internal
95 }  // namespace BIGTABLE_CLIENT_NS
96 }  // namespace bigtable
97 }  // namespace cloud
98 }  // namespace google
99