1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigtable/internal/async_bulk_apply.h"
16 #include "absl/memory/memory.h"
17
18 namespace google {
19 namespace cloud {
20 namespace bigtable {
21 inline namespace BIGTABLE_CLIENT_NS {
22 namespace internal {
23
Create(CompletionQueue cq,std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,IdempotentMutationPolicy & idempotent_policy,MetadataUpdatePolicy metadata_update_policy,std::shared_ptr<bigtable::DataClient> client,std::string const & app_profile_id,std::string const & table_name,BulkMutation mut)24 future<std::vector<FailedMutation>> AsyncRetryBulkApply::Create(
25 CompletionQueue cq, std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
26 std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
27 IdempotentMutationPolicy& idempotent_policy,
28 MetadataUpdatePolicy metadata_update_policy,
29 std::shared_ptr<bigtable::DataClient> client,
30 std::string const& app_profile_id, std::string const& table_name,
31 BulkMutation mut) {
32 std::shared_ptr<AsyncRetryBulkApply> bulk_apply(new AsyncRetryBulkApply(
33 std::move(rpc_retry_policy), std::move(rpc_backoff_policy),
34 idempotent_policy, std::move(metadata_update_policy), std::move(client),
35 app_profile_id, table_name, std::move(mut)));
36 bulk_apply->StartIterationIfNeeded(std::move(cq));
37 return bulk_apply->promise_.get_future();
38 }
39
AsyncRetryBulkApply(std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,IdempotentMutationPolicy & idempotent_policy,MetadataUpdatePolicy metadata_update_policy,std::shared_ptr<bigtable::DataClient> client,std::string const & app_profile_id,std::string const & table_name,BulkMutation mut)40 AsyncRetryBulkApply::AsyncRetryBulkApply(
41 std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
42 std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
43 IdempotentMutationPolicy& idempotent_policy,
44 MetadataUpdatePolicy metadata_update_policy,
45 std::shared_ptr<bigtable::DataClient> client,
46 std::string const& app_profile_id, std::string const& table_name,
47 BulkMutation mut)
48 : rpc_retry_policy_(std::move(rpc_retry_policy)),
49 rpc_backoff_policy_(std::move(rpc_backoff_policy)),
50 metadata_update_policy_(std::move(metadata_update_policy)),
51 client_(std::move(client)),
52 state_(app_profile_id, table_name, idempotent_policy, std::move(mut)) {}
53
StartIterationIfNeeded(CompletionQueue cq)54 void AsyncRetryBulkApply::StartIterationIfNeeded(CompletionQueue cq) {
55 if (!state_.HasPendingMutations()) {
56 // There is nothing to do, so just satisfy the future and return. Note that
57 // in the case of the retry policy begin expired we hit this point because
58 // the mutations are no longer "pending", they are all resolved with a
59 // error status.
60 promise_.set_value(std::move(state_).OnRetryDone());
61 return;
62 }
63
64 auto context = absl::make_unique<grpc::ClientContext>();
65 rpc_retry_policy_->Setup(*context);
66 rpc_backoff_policy_->Setup(*context);
67 metadata_update_policy_.Setup(*context);
68 auto client = client_;
69 auto self = shared_from_this();
70 cq.MakeStreamingReadRpc(
71 [client](grpc::ClientContext* context,
72 google::bigtable::v2::MutateRowsRequest const& request,
73 grpc::CompletionQueue* cq) {
74 return client->PrepareAsyncMutateRows(context, request, cq);
75 },
76 state_.BeforeStart(), std::move(context),
77 [self, cq](google::bigtable::v2::MutateRowsResponse r) {
78 self->OnRead(std::move(r));
79 return make_ready_future(true);
80 },
81 [self, cq](Status s) { self->OnFinish(cq, std::move(s)); });
82 }
83
OnRead(google::bigtable::v2::MutateRowsResponse response)84 void AsyncRetryBulkApply::OnRead(
85 google::bigtable::v2::MutateRowsResponse response) {
86 state_.OnRead(response);
87 }
88
OnFinish(CompletionQueue cq,Status status)89 void AsyncRetryBulkApply::OnFinish(CompletionQueue cq, Status status) {
90 state_.OnFinish(std::move(status));
91 StartIterationIfNeeded(std::move(cq));
92 }
93
94 } // namespace internal
95 } // namespace BIGTABLE_CLIENT_NS
96 } // namespace bigtable
97 } // namespace cloud
98 } // namespace google
99