1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
17 
18 #include "google/cloud/completion_queue.h"
19 #include "google/cloud/grpc_error_delegate.h"
20 #include "google/cloud/internal/completion_queue_impl.h"
21 #include "google/cloud/version.h"
22 #include "absl/functional/function_ref.h"
23 #include "absl/types/optional.h"
24 #include <grpcpp/support/async_stream.h>
25 #include <memory>
26 
27 namespace google {
28 namespace cloud {
29 inline namespace GOOGLE_CLOUD_CPP_NS {
30 namespace internal {
31 
32 template <typename Request, typename Response>
33 class AsyncStreamingReadWriteRpc {
34  public:
35   virtual ~AsyncStreamingReadWriteRpc() = default;
36 
37   virtual void Cancel() = 0;
38   virtual future<bool> Start() = 0;
39   virtual future<absl::optional<Response>> Read() = 0;
40   virtual future<bool> Write(Request const&, grpc::WriteOptions) = 0;
41   virtual future<bool> WritesDone() = 0;
42   virtual future<Status> Finish() = 0;
43 };
44 
45 /**
46  * Wrapper for Asynchronous Streaming Read/Write RPCs.
47  *
48  * A wrapper for gRPC's asynchronous streaming read-write APIs, which can be
49  * combined with `google::cloud::CompletionQueue` and `google::cloud::future<>`
50  * to provide easier-to-use abstractions.
51  */
52 template <typename Request, typename Response>
53 class AsyncStreamingReadWriteRpcImpl
54     : public AsyncStreamingReadWriteRpc<Request, Response> {
55  public:
AsyncStreamingReadWriteRpcImpl(std::shared_ptr<CompletionQueueImpl> cq,std::unique_ptr<grpc::ClientContext> context,std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request,Response>> stream)56   AsyncStreamingReadWriteRpcImpl(
57       std::shared_ptr<CompletionQueueImpl> cq,
58       std::unique_ptr<grpc::ClientContext> context,
59       std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
60           stream)
61       : cq_(std::move(cq)),
62         context_(std::move(context)),
63         stream_(std::move(stream)) {}
64 
Cancel()65   void Cancel() override { context_->TryCancel(); }
66 
Start()67   future<bool> Start() override {
68     struct OnStart : public AsyncGrpcOperation {
69       promise<bool> p;
70       bool Notify(bool ok) override {
71         p.set_value(ok);
72         return true;
73       }
74       void Cancel() override {}
75     };
76     auto op = std::make_shared<OnStart>();
77     cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); });
78     return op->p.get_future();
79   }
80 
Read()81   future<absl::optional<Response>> Read() override {
82     struct OnRead : public AsyncGrpcOperation {
83       promise<absl::optional<Response>> p;
84       Response response;
85       bool Notify(bool ok) override {
86         if (!ok) {
87           p.set_value({});
88           return true;
89         }
90         p.set_value(std::move(response));
91         return true;
92       }
93       void Cancel() override {}
94     };
95     auto op = std::make_shared<OnRead>();
96     cq_->StartOperation(op,
97                         [&](void* tag) { stream_->Read(&op->response, tag); });
98     return op->p.get_future();
99   }
100 
Write(Request const & request,grpc::WriteOptions options)101   future<bool> Write(Request const& request,
102                      grpc::WriteOptions options) override {
103     struct OnWrite : public AsyncGrpcOperation {
104       promise<bool> p;
105       bool Notify(bool ok) override {
106         p.set_value(ok);
107         return true;
108       }
109       void Cancel() override {}
110     };
111     auto op = std::make_shared<OnWrite>();
112     cq_->StartOperation(op, [&](void* tag) {
113       stream_->Write(request, std::move(options), tag);
114     });
115     return op->p.get_future();
116   }
117 
WritesDone()118   future<bool> WritesDone() override {
119     struct OnWritesDone : public AsyncGrpcOperation {
120       promise<bool> p;
121       bool Notify(bool ok) override {
122         p.set_value(ok);
123         return true;
124       }
125       void Cancel() override {}
126     };
127     auto op = std::make_shared<OnWritesDone>();
128     cq_->StartOperation(op, [&](void* tag) { stream_->WritesDone(tag); });
129     return op->p.get_future();
130   }
131 
Finish()132   future<Status> Finish() override {
133     struct OnFinish : public AsyncGrpcOperation {
134       promise<Status> p;
135       grpc::Status status;
136       bool Notify(bool /*ok*/) override {
137         p.set_value(MakeStatusFromRpcError(std::move(status)));
138         return true;
139       }
140       void Cancel() override {}
141     };
142     auto op = std::make_shared<OnFinish>();
143     cq_->StartOperation(op,
144                         [&](void* tag) { stream_->Finish(&op->status, tag); });
145     return op->p.get_future();
146   }
147 
148  private:
149   std::shared_ptr<CompletionQueueImpl> cq_;
150   std::unique_ptr<grpc::ClientContext> context_;
151   std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
152       stream_;
153 };
154 
155 template <typename Request, typename Response>
156 using PrepareAsyncReadWriteRpc = absl::FunctionRef<
157     std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>(
158         grpc::ClientContext*, grpc::CompletionQueue*)>;
159 
160 /**
161  * Make an asynchronous streaming read/write RPC using `CompletionQueue`.
162  *
163  * @note in the past we would have made this a member function of the
164  *     `CompletionQueue` class. We want to avoid this as (a) we are not certain
165  *     this is the long term API we want to expose, (b) once in the public
166  *     `CompletionQueue` class it is hard to remove member functions.  Placing
167  *     the API in the `internal::` namespace give us more flexibility for the
168  *     future, at the cost of (hopefully controlled) breaks in encapsulation.
169  */
170 template <typename Request, typename Response>
171 std::unique_ptr<AsyncStreamingReadWriteRpc<Request, Response>>
MakeStreamingReadWriteRpc(CompletionQueue & cq,std::unique_ptr<grpc::ClientContext> context,PrepareAsyncReadWriteRpc<Request,Response> async_call)172 MakeStreamingReadWriteRpc(
173     CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
174     PrepareAsyncReadWriteRpc<Request, Response> async_call) {
175   auto cq_impl = GetCompletionQueueImpl(cq);
176   auto stream = async_call(context.get(), &cq_impl->cq());
177   return absl::make_unique<AsyncStreamingReadWriteRpcImpl<Request, Response>>(
178       std::move(cq_impl), std::move(context), std::move(stream));
179 }
180 
181 }  // namespace internal
182 }  // namespace GOOGLE_CLOUD_CPP_NS
183 }  // namespace cloud
184 }  // namespace google
185 
186 #endif  // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_READ_WRITE_STREAM_IMPL_H
187