1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
17
18 #include "google/cloud/completion_queue.h"
19 #include "google/cloud/grpc_error_delegate.h"
20 #include "google/cloud/internal/completion_queue_impl.h"
21 #include "google/cloud/version.h"
22 #include "absl/functional/function_ref.h"
23 #include "absl/types/optional.h"
24 #include <grpcpp/support/async_stream.h>
25 #include <memory>
26
27 namespace google {
28 namespace cloud {
29 inline namespace GOOGLE_CLOUD_CPP_NS {
30 namespace internal {
31
32 template <typename Request, typename Response>
33 class AsyncStreamingReadWriteRpc {
34 public:
35 virtual ~AsyncStreamingReadWriteRpc() = default;
36
37 virtual void Cancel() = 0;
38 virtual future<bool> Start() = 0;
39 virtual future<absl::optional<Response>> Read() = 0;
40 virtual future<bool> Write(Request const&, grpc::WriteOptions) = 0;
41 virtual future<bool> WritesDone() = 0;
42 virtual future<Status> Finish() = 0;
43 };
44
45 /**
46 * Wrapper for Asynchronous Streaming Read/Write RPCs.
47 *
48 * A wrapper for gRPC's asynchronous streaming read-write APIs, which can be
49 * combined with `google::cloud::CompletionQueue` and `google::cloud::future<>`
50 * to provide easier-to-use abstractions.
51 */
52 template <typename Request, typename Response>
53 class AsyncStreamingReadWriteRpcImpl
54 : public AsyncStreamingReadWriteRpc<Request, Response> {
55 public:
AsyncStreamingReadWriteRpcImpl(std::shared_ptr<CompletionQueueImpl> cq,std::unique_ptr<grpc::ClientContext> context,std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request,Response>> stream)56 AsyncStreamingReadWriteRpcImpl(
57 std::shared_ptr<CompletionQueueImpl> cq,
58 std::unique_ptr<grpc::ClientContext> context,
59 std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
60 stream)
61 : cq_(std::move(cq)),
62 context_(std::move(context)),
63 stream_(std::move(stream)) {}
64
Cancel()65 void Cancel() override { context_->TryCancel(); }
66
Start()67 future<bool> Start() override {
68 struct OnStart : public AsyncGrpcOperation {
69 promise<bool> p;
70 bool Notify(bool ok) override {
71 p.set_value(ok);
72 return true;
73 }
74 void Cancel() override {}
75 };
76 auto op = std::make_shared<OnStart>();
77 cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); });
78 return op->p.get_future();
79 }
80
Read()81 future<absl::optional<Response>> Read() override {
82 struct OnRead : public AsyncGrpcOperation {
83 promise<absl::optional<Response>> p;
84 Response response;
85 bool Notify(bool ok) override {
86 if (!ok) {
87 p.set_value({});
88 return true;
89 }
90 p.set_value(std::move(response));
91 return true;
92 }
93 void Cancel() override {}
94 };
95 auto op = std::make_shared<OnRead>();
96 cq_->StartOperation(op,
97 [&](void* tag) { stream_->Read(&op->response, tag); });
98 return op->p.get_future();
99 }
100
Write(Request const & request,grpc::WriteOptions options)101 future<bool> Write(Request const& request,
102 grpc::WriteOptions options) override {
103 struct OnWrite : public AsyncGrpcOperation {
104 promise<bool> p;
105 bool Notify(bool ok) override {
106 p.set_value(ok);
107 return true;
108 }
109 void Cancel() override {}
110 };
111 auto op = std::make_shared<OnWrite>();
112 cq_->StartOperation(op, [&](void* tag) {
113 stream_->Write(request, std::move(options), tag);
114 });
115 return op->p.get_future();
116 }
117
WritesDone()118 future<bool> WritesDone() override {
119 struct OnWritesDone : public AsyncGrpcOperation {
120 promise<bool> p;
121 bool Notify(bool ok) override {
122 p.set_value(ok);
123 return true;
124 }
125 void Cancel() override {}
126 };
127 auto op = std::make_shared<OnWritesDone>();
128 cq_->StartOperation(op, [&](void* tag) { stream_->WritesDone(tag); });
129 return op->p.get_future();
130 }
131
Finish()132 future<Status> Finish() override {
133 struct OnFinish : public AsyncGrpcOperation {
134 promise<Status> p;
135 grpc::Status status;
136 bool Notify(bool /*ok*/) override {
137 p.set_value(MakeStatusFromRpcError(std::move(status)));
138 return true;
139 }
140 void Cancel() override {}
141 };
142 auto op = std::make_shared<OnFinish>();
143 cq_->StartOperation(op,
144 [&](void* tag) { stream_->Finish(&op->status, tag); });
145 return op->p.get_future();
146 }
147
148 private:
149 std::shared_ptr<CompletionQueueImpl> cq_;
150 std::unique_ptr<grpc::ClientContext> context_;
151 std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
152 stream_;
153 };
154
155 template <typename Request, typename Response>
156 using PrepareAsyncReadWriteRpc = absl::FunctionRef<
157 std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>(
158 grpc::ClientContext*, grpc::CompletionQueue*)>;
159
160 /**
161 * Make an asynchronous streaming read/write RPC using `CompletionQueue`.
162 *
163 * @note in the past we would have made this a member function of the
164 * `CompletionQueue` class. We want to avoid this as (a) we are not certain
165 * this is the long term API we want to expose, (b) once in the public
166 * `CompletionQueue` class it is hard to remove member functions. Placing
167 * the API in the `internal::` namespace give us more flexibility for the
168 * future, at the cost of (hopefully controlled) breaks in encapsulation.
169 */
170 template <typename Request, typename Response>
171 std::unique_ptr<AsyncStreamingReadWriteRpc<Request, Response>>
MakeStreamingReadWriteRpc(CompletionQueue & cq,std::unique_ptr<grpc::ClientContext> context,PrepareAsyncReadWriteRpc<Request,Response> async_call)172 MakeStreamingReadWriteRpc(
173 CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
174 PrepareAsyncReadWriteRpc<Request, Response> async_call) {
175 auto cq_impl = GetCompletionQueueImpl(cq);
176 auto stream = async_call(context.get(), &cq_impl->cq());
177 return absl::make_unique<AsyncStreamingReadWriteRpcImpl<Request, Response>>(
178 std::move(cq_impl), std::move(context), std::move(stream));
179 }
180
181 } // namespace internal
182 } // namespace GOOGLE_CLOUD_CPP_NS
183 } // namespace cloud
184 } // namespace google
185
186 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
187