1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 
21 // IWYU pragma: private, include <grpcpp/support/client_callback.h>
22 
23 #include <atomic>
24 #include <functional>
25 
26 #include <grpcpp/impl/codegen/call.h>
27 #include <grpcpp/impl/codegen/call_op_set.h>
28 #include <grpcpp/impl/codegen/callback_common.h>
29 #include <grpcpp/impl/codegen/channel_interface.h>
30 #include <grpcpp/impl/codegen/config.h>
31 #include <grpcpp/impl/codegen/core_codegen_interface.h>
32 #include <grpcpp/impl/codegen/status.h>
33 #include <grpcpp/impl/codegen/sync.h>
34 
35 namespace grpc {
36 class Channel;
37 class ClientContext;
38 
39 namespace internal {
40 class RpcMethod;
41 
42 /// Perform a callback-based unary call.  May optionally specify the base
43 /// class of the Request and Response so that the internal calls and structures
44 /// below this may be based on those base classes and thus achieve code reuse
45 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
46 /// class).
47 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
48 template <class InputMessage, class OutputMessage,
49           class BaseInputMessage = InputMessage,
50           class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)51 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
52                        const ::grpc::internal::RpcMethod& method,
53                        ::grpc::ClientContext* context,
54                        const InputMessage* request, OutputMessage* result,
55                        std::function<void(::grpc::Status)> on_completion) {
56   static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
57                 "Invalid input message specification");
58   static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
59                 "Invalid output message specification");
60   CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
61       channel, method, context, request, result, on_completion);
62 }
63 
64 template <class InputMessage, class OutputMessage>
65 class CallbackUnaryCallImpl {
66  public:
CallbackUnaryCallImpl(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)67   CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
68                         const ::grpc::internal::RpcMethod& method,
69                         ::grpc::ClientContext* context,
70                         const InputMessage* request, OutputMessage* result,
71                         std::function<void(::grpc::Status)> on_completion) {
72     ::grpc::CompletionQueue* cq = channel->CallbackCQ();
73     GPR_CODEGEN_ASSERT(cq != nullptr);
74     grpc::internal::Call call(channel->CreateCall(method, context, cq));
75 
76     using FullCallOpSet = grpc::internal::CallOpSet<
77         ::grpc::internal::CallOpSendInitialMetadata,
78         grpc::internal::CallOpSendMessage,
79         grpc::internal::CallOpRecvInitialMetadata,
80         grpc::internal::CallOpRecvMessage<OutputMessage>,
81         grpc::internal::CallOpClientSendClose,
82         grpc::internal::CallOpClientRecvStatus>;
83 
84     struct OpSetAndTag {
85       FullCallOpSet opset;
86       grpc::internal::CallbackWithStatusTag tag;
87     };
88     const size_t alloc_sz = sizeof(OpSetAndTag);
89     auto* const alloced = static_cast<OpSetAndTag*>(
90         ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
91                                                                 alloc_sz));
92     auto* ops = new (&alloced->opset) FullCallOpSet;
93     auto* tag = new (&alloced->tag)
94         grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
95 
96     // TODO(vjpai): Unify code with sync API as much as possible
97     ::grpc::Status s = ops->SendMessagePtr(request);
98     if (!s.ok()) {
99       tag->force_run(s);
100       return;
101     }
102     ops->SendInitialMetadata(&context->send_initial_metadata_,
103                              context->initial_metadata_flags());
104     ops->RecvInitialMetadata(context);
105     ops->RecvMessage(result);
106     ops->AllowNoMessage();
107     ops->ClientSendClose();
108     ops->ClientRecvStatus(context, tag->status_ptr());
109     ops->set_core_cq_tag(tag);
110     call.PerformOps(ops);
111   }
112 };
113 
114 // Base class for public API classes.
115 class ClientReactor {
116  public:
117   virtual ~ClientReactor() = default;
118 
119   /// Called by the library when all operations associated with this RPC have
120   /// completed and all Holds have been removed. OnDone provides the RPC status
121   /// outcome for both successful and failed RPCs. If it is never called on an
122   /// RPC, it indicates an application-level problem (like failure to remove a
123   /// hold).
124   ///
125   /// \param[in] s The status outcome of this RPC
126   virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
127 
128   /// InternalScheduleOnDone is not part of the API and is not meant to be
129   /// overridden. It is virtual to allow successful builds for certain bazel
130   /// build users that only want to depend on gRPC codegen headers and not the
131   /// full library (although this is not a generally-supported option). Although
132   /// the virtual call is slower than a direct call, this function is
133   /// heavyweight and the cost of the virtual call is not much in comparison.
134   /// This function may be removed or devirtualized in the future.
135   virtual void InternalScheduleOnDone(::grpc::Status s);
136 
137   /// InternalTrailersOnly is not part of the API and is not meant to be
138   /// overridden. It is virtual to allow successful builds for certain bazel
139   /// build users that only want to depend on gRPC codegen headers and not the
140   /// full library (although this is not a generally-supported option). Although
141   /// the virtual call is slower than a direct call, this function is
142   /// heavyweight and the cost of the virtual call is not much in comparison.
143   /// This function may be removed or devirtualized in the future.
144   virtual bool InternalTrailersOnly(const grpc_call* call) const;
145 };
146 
147 }  // namespace internal
148 
149 // Forward declarations
150 template <class Request, class Response>
151 class ClientBidiReactor;
152 template <class Response>
153 class ClientReadReactor;
154 template <class Request>
155 class ClientWriteReactor;
156 class ClientUnaryReactor;
157 
158 // NOTE: The streaming objects are not actually implemented in the public API.
159 //       These interfaces are provided for mocking only. Typical applications
160 //       will interact exclusively with the reactors that they define.
161 template <class Request, class Response>
162 class ClientCallbackReaderWriter {
163  public:
~ClientCallbackReaderWriter()164   virtual ~ClientCallbackReaderWriter() {}
165   virtual void StartCall() = 0;
166   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
167   virtual void WritesDone() = 0;
168   virtual void Read(Response* resp) = 0;
169   virtual void AddHold(int holds) = 0;
170   virtual void RemoveHold() = 0;
171 
172  protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)173   void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
174     reactor->BindStream(this);
175   }
176 };
177 
178 template <class Response>
179 class ClientCallbackReader {
180  public:
~ClientCallbackReader()181   virtual ~ClientCallbackReader() {}
182   virtual void StartCall() = 0;
183   virtual void Read(Response* resp) = 0;
184   virtual void AddHold(int holds) = 0;
185   virtual void RemoveHold() = 0;
186 
187  protected:
BindReactor(ClientReadReactor<Response> * reactor)188   void BindReactor(ClientReadReactor<Response>* reactor) {
189     reactor->BindReader(this);
190   }
191 };
192 
193 template <class Request>
194 class ClientCallbackWriter {
195  public:
~ClientCallbackWriter()196   virtual ~ClientCallbackWriter() {}
197   virtual void StartCall() = 0;
Write(const Request * req)198   void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
199   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
WriteLast(const Request * req,::grpc::WriteOptions options)200   void WriteLast(const Request* req, ::grpc::WriteOptions options) {
201     Write(req, options.set_last_message());
202   }
203   virtual void WritesDone() = 0;
204 
205   virtual void AddHold(int holds) = 0;
206   virtual void RemoveHold() = 0;
207 
208  protected:
BindReactor(ClientWriteReactor<Request> * reactor)209   void BindReactor(ClientWriteReactor<Request>* reactor) {
210     reactor->BindWriter(this);
211   }
212 };
213 
214 class ClientCallbackUnary {
215  public:
~ClientCallbackUnary()216   virtual ~ClientCallbackUnary() {}
217   virtual void StartCall() = 0;
218 
219  protected:
220   void BindReactor(ClientUnaryReactor* reactor);
221 };
222 
223 // The following classes are the reactor interfaces that are to be implemented
224 // by the user. They are passed in to the library as an argument to a call on a
225 // stub (either a codegen-ed call or a generic call). The streaming RPC is
226 // activated by calling StartCall, possibly after initiating StartRead,
227 // StartWrite, or AddHold operations on the streaming object. Note that none of
228 // the classes are pure; all reactions have a default empty reaction so that the
229 // user class only needs to override those reactions that it cares about.
230 // The reactor must be passed to the stub invocation before any of the below
231 // operations can be called and its reactions will be invoked by the library in
232 // response to the completion of various operations. Reactions must not include
233 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
234 // waiting on condition variables). Reactions may be invoked concurrently,
235 // except that OnDone is called after all others (assuming proper API usage).
236 // The reactor may not be deleted until OnDone is called.
237 
238 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
239 template <class Request, class Response>
240 class ClientBidiReactor : public internal::ClientReactor {
241  public:
242   /// Activate the RPC and initiate any reads or writes that have been Start'ed
243   /// before this call. All streaming RPCs issued by the client MUST have
244   /// StartCall invoked on them (even if they are canceled) as this call is the
245   /// activation of their lifecycle.
StartCall()246   void StartCall() { stream_->StartCall(); }
247 
248   /// Initiate a read operation (or post it for later initiation if StartCall
249   /// has not yet been invoked).
250   ///
251   /// \param[out] resp Where to eventually store the read message. Valid when
252   ///                  the library calls OnReadDone
StartRead(Response * resp)253   void StartRead(Response* resp) { stream_->Read(resp); }
254 
255   /// Initiate a write operation (or post it for later initiation if StartCall
256   /// has not yet been invoked).
257   ///
258   /// \param[in] req The message to be written. The library does not take
259   ///                ownership but the caller must ensure that the message is
260   ///                not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)261   void StartWrite(const Request* req) {
262     StartWrite(req, ::grpc::WriteOptions());
263   }
264 
265   /// Initiate/post a write operation with specified options.
266   ///
267   /// \param[in] req The message to be written. The library does not take
268   ///                ownership but the caller must ensure that the message is
269   ///                not deleted or modified until OnWriteDone is called.
270   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,::grpc::WriteOptions options)271   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
272     stream_->Write(req, options);
273   }
274 
275   /// Initiate/post a write operation with specified options and an indication
276   /// that this is the last write (like StartWrite and StartWritesDone, merged).
277   /// Note that calling this means that no more calls to StartWrite,
278   /// StartWriteLast, or StartWritesDone are allowed.
279   ///
280   /// \param[in] req The message to be written. The library does not take
281   ///                ownership but the caller must ensure that the message is
282   ///                not deleted or modified until OnWriteDone is called.
283   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,::grpc::WriteOptions options)284   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
285     StartWrite(req, options.set_last_message());
286   }
287 
288   /// Indicate that the RPC will have no more write operations. This can only be
289   /// issued once for a given RPC. This is not required or allowed if
290   /// StartWriteLast is used since that already has the same implication.
291   /// Note that calling this means that no more calls to StartWrite,
292   /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()293   void StartWritesDone() { stream_->WritesDone(); }
294 
295   /// Holds are needed if (and only if) this stream has operations that take
296   /// place on it after StartCall but from outside one of the reactions
297   /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
298   ///
299   /// Holds must be added before calling StartCall. If a stream still has a hold
300   /// in place, its resources will not be destroyed even if the status has
301   /// already come in from the wire and there are currently no active callbacks
302   /// outstanding. Similarly, the stream will not call OnDone if there are still
303   /// holds on it.
304   ///
305   /// For example, if a StartRead or StartWrite operation is going to be
306   /// initiated from elsewhere in the application, the application should call
307   /// AddHold or AddMultipleHolds before StartCall.  If there is going to be,
308   /// for example, a read-flow and a write-flow taking place outside the
309   /// reactions, then call AddMultipleHolds(2) before StartCall. When the
310   /// application knows that it won't issue any more read operations (such as
311   /// when a read comes back as not ok), it should issue a RemoveHold(). It
312   /// should also call RemoveHold() again after it does StartWriteLast or
313   /// StartWritesDone that indicates that there will be no more write ops.
314   /// The number of RemoveHold calls must match the total number of AddHold
315   /// calls plus the number of holds added by AddMultipleHolds.
316   /// The argument to AddMultipleHolds must be positive.
AddHold()317   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)318   void AddMultipleHolds(int holds) {
319     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
320     stream_->AddHold(holds);
321   }
RemoveHold()322   void RemoveHold() { stream_->RemoveHold(); }
323 
324   /// Notifies the application that all operations associated with this RPC
325   /// have completed and all Holds have been removed. OnDone provides the RPC
326   /// status outcome for both successful and failed RPCs and will be called in
327   /// all cases. If it is not called, it indicates an application-level problem
328   /// (like failure to remove a hold).
329   ///
330   /// \param[in] s The status outcome of this RPC
OnDone(const::grpc::Status &)331   void OnDone(const ::grpc::Status& /*s*/) override {}
332 
333   /// Notifies the application that a read of initial metadata from the
334   /// server is done. If the application chooses not to implement this method,
335   /// it can assume that the initial metadata has been read before the first
336   /// call of OnReadDone or OnDone.
337   ///
338   /// \param[in] ok Was the initial metadata read successfully? If false, no
339   ///               new read/write operation will succeed, and any further
340   ///               Start* operations should not be called.
OnReadInitialMetadataDone(bool)341   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
342 
343   /// Notifies the application that a StartRead operation completed.
344   ///
345   /// \param[in] ok Was it successful? If false, no new read/write operation
346   ///               will succeed, and any further Start* should not be called.
OnReadDone(bool)347   virtual void OnReadDone(bool /*ok*/) {}
348 
349   /// Notifies the application that a StartWrite or StartWriteLast operation
350   /// completed.
351   ///
352   /// \param[in] ok Was it successful? If false, no new read/write operation
353   ///               will succeed, and any further Start* should not be called.
OnWriteDone(bool)354   virtual void OnWriteDone(bool /*ok*/) {}
355 
356   /// Notifies the application that a StartWritesDone operation completed. Note
357   /// that this is only used on explicit StartWritesDone operations and not for
358   /// those that are implicitly invoked as part of a StartWriteLast.
359   ///
360   /// \param[in] ok Was it successful? If false, the application will later see
361   ///               the failure reflected as a bad status in OnDone and no
362   ///               further Start* should be called.
OnWritesDoneDone(bool)363   virtual void OnWritesDoneDone(bool /*ok*/) {}
364 
365  private:
366   friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)367   void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
368     stream_ = stream;
369   }
370   ClientCallbackReaderWriter<Request, Response>* stream_;
371 };
372 
373 /// \a ClientReadReactor is the interface for a server-streaming RPC.
374 /// All public methods behave as in ClientBidiReactor.
375 template <class Response>
376 class ClientReadReactor : public internal::ClientReactor {
377  public:
StartCall()378   void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)379   void StartRead(Response* resp) { reader_->Read(resp); }
380 
AddHold()381   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)382   void AddMultipleHolds(int holds) {
383     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
384     reader_->AddHold(holds);
385   }
RemoveHold()386   void RemoveHold() { reader_->RemoveHold(); }
387 
OnDone(const::grpc::Status &)388   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)389   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)390   virtual void OnReadDone(bool /*ok*/) {}
391 
392  private:
393   friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)394   void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
395   ClientCallbackReader<Response>* reader_;
396 };
397 
398 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
399 /// All public methods behave as in ClientBidiReactor.
400 template <class Request>
401 class ClientWriteReactor : public internal::ClientReactor {
402  public:
StartCall()403   void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)404   void StartWrite(const Request* req) {
405     StartWrite(req, ::grpc::WriteOptions());
406   }
StartWrite(const Request * req,::grpc::WriteOptions options)407   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
408     writer_->Write(req, options);
409   }
StartWriteLast(const Request * req,::grpc::WriteOptions options)410   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
411     StartWrite(req, options.set_last_message());
412   }
StartWritesDone()413   void StartWritesDone() { writer_->WritesDone(); }
414 
AddHold()415   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)416   void AddMultipleHolds(int holds) {
417     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
418     writer_->AddHold(holds);
419   }
RemoveHold()420   void RemoveHold() { writer_->RemoveHold(); }
421 
OnDone(const::grpc::Status &)422   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)423   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)424   virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)425   virtual void OnWritesDoneDone(bool /*ok*/) {}
426 
427  private:
428   friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)429   void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
430 
431   ClientCallbackWriter<Request>* writer_;
432 };
433 
434 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
435 /// This is _not_ a common way of invoking a unary RPC. In practice, this
436 /// option should be used only if the unary RPC wants to receive initial
437 /// metadata without waiting for the response to complete. Most deployments of
438 /// RPC systems do not use this option, but it is needed for generality.
439 /// All public methods behave as in ClientBidiReactor.
440 /// StartCall is included for consistency with the other reactor flavors: even
441 /// though there are no StartRead or StartWrite operations to queue before the
442 /// call (that is part of the unary call itself) and there is no reactor object
443 /// being created as a result of this call, we keep a consistent 2-phase
444 /// initiation API among all the reactor flavors.
445 class ClientUnaryReactor : public internal::ClientReactor {
446  public:
StartCall()447   void StartCall() { call_->StartCall(); }
OnDone(const::grpc::Status &)448   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)449   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
450 
451  private:
452   friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)453   void BindCall(ClientCallbackUnary* call) { call_ = call; }
454   ClientCallbackUnary* call_;
455 };
456 
457 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)458 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
459   reactor->BindCall(this);
460 }
461 
462 namespace internal {
463 
464 // Forward declare factory classes for friendship
465 template <class Request, class Response>
466 class ClientCallbackReaderWriterFactory;
467 template <class Response>
468 class ClientCallbackReaderFactory;
469 template <class Request>
470 class ClientCallbackWriterFactory;
471 
472 template <class Request, class Response>
473 class ClientCallbackReaderWriterImpl
474     : public ClientCallbackReaderWriter<Request, Response> {
475  public:
476   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)477   static void operator delete(void* /*ptr*/, std::size_t size) {
478     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
479   }
480 
481   // This operator should never be called as the memory should be freed as part
482   // of the arena destruction. It only exists to provide a matching operator
483   // delete to the operator new so that some compilers will not complain (see
484   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
485   // there are no tests catching the compiler warning.
delete(void *,void *)486   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
487 
StartCall()488   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
489     // This call initiates two batches, plus any backlog, each with a callback
490     // 1. Send initial metadata (unless corked) + recv initial metadata
491     // 2. Any read backlog
492     // 3. Any write backlog
493     // 4. Recv trailing metadata (unless corked)
494     if (!start_corked_) {
495       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
496                                      context_->initial_metadata_flags());
497     }
498 
499     call_.PerformOps(&start_ops_);
500 
501     {
502       grpc::internal::MutexLock lock(&start_mu_);
503 
504       if (backlog_.read_ops) {
505         call_.PerformOps(&read_ops_);
506       }
507       if (backlog_.write_ops) {
508         call_.PerformOps(&write_ops_);
509       }
510       if (backlog_.writes_done_ops) {
511         call_.PerformOps(&writes_done_ops_);
512       }
513       call_.PerformOps(&finish_ops_);
514       // The last thing in this critical section is to set started_ so that it
515       // can be used lock-free as well.
516       started_.store(true, std::memory_order_release);
517     }
518     // MaybeFinish outside the lock to make sure that destruction of this object
519     // doesn't take place while holding the lock (which would cause the lock to
520     // be released after destruction)
521     this->MaybeFinish(/*from_reaction=*/false);
522   }
523 
Read(Response * msg)524   void Read(Response* msg) override {
525     read_ops_.RecvMessage(msg);
526     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
527     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
528       grpc::internal::MutexLock lock(&start_mu_);
529       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
530         backlog_.read_ops = true;
531         return;
532       }
533     }
534     call_.PerformOps(&read_ops_);
535   }
536 
Write(const Request * msg,::grpc::WriteOptions options)537   void Write(const Request* msg, ::grpc::WriteOptions options)
538       ABSL_LOCKS_EXCLUDED(start_mu_) override {
539     if (options.is_last_message()) {
540       options.set_buffer_hint();
541       write_ops_.ClientSendClose();
542     }
543     // TODO(vjpai): don't assert
544     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
545     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
546     if (GPR_UNLIKELY(corked_write_needed_)) {
547       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
548                                      context_->initial_metadata_flags());
549       corked_write_needed_ = false;
550     }
551 
552     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
553       grpc::internal::MutexLock lock(&start_mu_);
554       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
555         backlog_.write_ops = true;
556         return;
557       }
558     }
559     call_.PerformOps(&write_ops_);
560   }
WritesDone()561   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
562     writes_done_ops_.ClientSendClose();
563     writes_done_tag_.Set(
564         call_.call(),
565         [this](bool ok) {
566           reactor_->OnWritesDoneDone(ok);
567           MaybeFinish(/*from_reaction=*/true);
568         },
569         &writes_done_ops_, /*can_inline=*/false);
570     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
571     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
572     if (GPR_UNLIKELY(corked_write_needed_)) {
573       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
574                                            context_->initial_metadata_flags());
575       corked_write_needed_ = false;
576     }
577     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
578       grpc::internal::MutexLock lock(&start_mu_);
579       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
580         backlog_.writes_done_ops = true;
581         return;
582       }
583     }
584     call_.PerformOps(&writes_done_ops_);
585   }
586 
AddHold(int holds)587   void AddHold(int holds) override {
588     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
589   }
RemoveHold()590   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
591 
592  private:
593   friend class ClientCallbackReaderWriterFactory<Request, Response>;
594 
ClientCallbackReaderWriterImpl(grpc::internal::Call call,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)595   ClientCallbackReaderWriterImpl(grpc::internal::Call call,
596                                  ::grpc::ClientContext* context,
597                                  ClientBidiReactor<Request, Response>* reactor)
598       : context_(context),
599         call_(call),
600         reactor_(reactor),
601         start_corked_(context_->initial_metadata_corked_),
602         corked_write_needed_(start_corked_) {
603     this->BindReactor(reactor);
604 
605     // Set up the unchanging parts of the start, read, and write tags and ops.
606     start_tag_.Set(
607         call_.call(),
608         [this](bool ok) {
609           reactor_->OnReadInitialMetadataDone(
610               ok && !reactor_->InternalTrailersOnly(call_.call()));
611           MaybeFinish(/*from_reaction=*/true);
612         },
613         &start_ops_, /*can_inline=*/false);
614     start_ops_.RecvInitialMetadata(context_);
615     start_ops_.set_core_cq_tag(&start_tag_);
616 
617     write_tag_.Set(
618         call_.call(),
619         [this](bool ok) {
620           reactor_->OnWriteDone(ok);
621           MaybeFinish(/*from_reaction=*/true);
622         },
623         &write_ops_, /*can_inline=*/false);
624     write_ops_.set_core_cq_tag(&write_tag_);
625 
626     read_tag_.Set(
627         call_.call(),
628         [this](bool ok) {
629           reactor_->OnReadDone(ok);
630           MaybeFinish(/*from_reaction=*/true);
631         },
632         &read_ops_, /*can_inline=*/false);
633     read_ops_.set_core_cq_tag(&read_tag_);
634 
635     // Also set up the Finish tag and op set.
636     finish_tag_.Set(
637         call_.call(),
638         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
639         &finish_ops_,
640         /*can_inline=*/false);
641     finish_ops_.ClientRecvStatus(context_, &finish_status_);
642     finish_ops_.set_core_cq_tag(&finish_tag_);
643   }
644 
645   // MaybeFinish can be called from reactions or from user-initiated operations
646   // like StartCall or RemoveHold. If this is the last operation or hold on this
647   // object, it will invoke the OnDone reaction. If MaybeFinish was called from
648   // a reaction, it can call OnDone directly. If not, it would need to schedule
649   // OnDone onto an executor thread to avoid the possibility of deadlocking with
650   // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)651   void MaybeFinish(bool from_reaction) {
652     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
653                          1, std::memory_order_acq_rel) == 1)) {
654       ::grpc::Status s = std::move(finish_status_);
655       auto* reactor = reactor_;
656       auto* call = call_.call();
657       this->~ClientCallbackReaderWriterImpl();
658       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
659       if (GPR_LIKELY(from_reaction)) {
660         reactor->OnDone(s);
661       } else {
662         reactor->InternalScheduleOnDone(std::move(s));
663       }
664     }
665   }
666 
667   ::grpc::ClientContext* const context_;
668   grpc::internal::Call call_;
669   ClientBidiReactor<Request, Response>* const reactor_;
670 
671   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
672                             grpc::internal::CallOpRecvInitialMetadata>
673       start_ops_;
674   grpc::internal::CallbackWithSuccessTag start_tag_;
675   const bool start_corked_;
676   bool corked_write_needed_;  // no lock needed since only accessed in
677                               // Write/WritesDone which cannot be concurrent
678 
679   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
680   grpc::internal::CallbackWithSuccessTag finish_tag_;
681   ::grpc::Status finish_status_;
682 
683   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
684                             grpc::internal::CallOpSendMessage,
685                             grpc::internal::CallOpClientSendClose>
686       write_ops_;
687   grpc::internal::CallbackWithSuccessTag write_tag_;
688 
689   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
690                             grpc::internal::CallOpClientSendClose>
691       writes_done_ops_;
692   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
693 
694   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
695       read_ops_;
696   grpc::internal::CallbackWithSuccessTag read_tag_;
697 
698   struct StartCallBacklog {
699     bool write_ops = false;
700     bool writes_done_ops = false;
701     bool read_ops = false;
702   };
703   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
704 
705   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
706   std::atomic<intptr_t> callbacks_outstanding_{3};
707   std::atomic_bool started_{false};
708   grpc::internal::Mutex start_mu_;
709 };
710 
711 template <class Request, class Response>
712 class ClientCallbackReaderWriterFactory {
713  public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)714   static void Create(::grpc::ChannelInterface* channel,
715                      const ::grpc::internal::RpcMethod& method,
716                      ::grpc::ClientContext* context,
717                      ClientBidiReactor<Request, Response>* reactor) {
718     grpc::internal::Call call =
719         channel->CreateCall(method, context, channel->CallbackCQ());
720 
721     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
722     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
723         call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
724         ClientCallbackReaderWriterImpl<Request, Response>(call, context,
725                                                           reactor);
726   }
727 };
728 
729 template <class Response>
730 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
731  public:
732   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)733   static void operator delete(void* /*ptr*/, std::size_t size) {
734     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
735   }
736 
737   // This operator should never be called as the memory should be freed as part
738   // of the arena destruction. It only exists to provide a matching operator
739   // delete to the operator new so that some compilers will not complain (see
740   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
741   // there are no tests catching the compiler warning.
delete(void *,void *)742   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
743 
StartCall()744   void StartCall() override {
745     // This call initiates two batches, plus any backlog, each with a callback
746     // 1. Send initial metadata (unless corked) + recv initial metadata
747     // 2. Any backlog
748     // 3. Recv trailing metadata
749 
750     start_tag_.Set(
751         call_.call(),
752         [this](bool ok) {
753           reactor_->OnReadInitialMetadataDone(
754               ok && !reactor_->InternalTrailersOnly(call_.call()));
755           MaybeFinish(/*from_reaction=*/true);
756         },
757         &start_ops_, /*can_inline=*/false);
758     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
759                                    context_->initial_metadata_flags());
760     start_ops_.RecvInitialMetadata(context_);
761     start_ops_.set_core_cq_tag(&start_tag_);
762     call_.PerformOps(&start_ops_);
763 
764     // Also set up the read tag so it doesn't have to be set up each time
765     read_tag_.Set(
766         call_.call(),
767         [this](bool ok) {
768           reactor_->OnReadDone(ok);
769           MaybeFinish(/*from_reaction=*/true);
770         },
771         &read_ops_, /*can_inline=*/false);
772     read_ops_.set_core_cq_tag(&read_tag_);
773 
774     {
775       grpc::internal::MutexLock lock(&start_mu_);
776       if (backlog_.read_ops) {
777         call_.PerformOps(&read_ops_);
778       }
779       started_.store(true, std::memory_order_release);
780     }
781 
782     finish_tag_.Set(
783         call_.call(),
784         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
785         &finish_ops_, /*can_inline=*/false);
786     finish_ops_.ClientRecvStatus(context_, &finish_status_);
787     finish_ops_.set_core_cq_tag(&finish_tag_);
788     call_.PerformOps(&finish_ops_);
789   }
790 
Read(Response * msg)791   void Read(Response* msg) override {
792     read_ops_.RecvMessage(msg);
793     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
794     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
795       grpc::internal::MutexLock lock(&start_mu_);
796       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
797         backlog_.read_ops = true;
798         return;
799       }
800     }
801     call_.PerformOps(&read_ops_);
802   }
803 
AddHold(int holds)804   void AddHold(int holds) override {
805     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
806   }
RemoveHold()807   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
808 
809  private:
810   friend class ClientCallbackReaderFactory<Response>;
811 
812   template <class Request>
ClientCallbackReaderImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)813   ClientCallbackReaderImpl(::grpc::internal::Call call,
814                            ::grpc::ClientContext* context, Request* request,
815                            ClientReadReactor<Response>* reactor)
816       : context_(context), call_(call), reactor_(reactor) {
817     this->BindReactor(reactor);
818     // TODO(vjpai): don't assert
819     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
820     start_ops_.ClientSendClose();
821   }
822 
823   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)824   void MaybeFinish(bool from_reaction) {
825     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
826                          1, std::memory_order_acq_rel) == 1)) {
827       ::grpc::Status s = std::move(finish_status_);
828       auto* reactor = reactor_;
829       auto* call = call_.call();
830       this->~ClientCallbackReaderImpl();
831       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
832       if (GPR_LIKELY(from_reaction)) {
833         reactor->OnDone(s);
834       } else {
835         reactor->InternalScheduleOnDone(std::move(s));
836       }
837     }
838   }
839 
840   ::grpc::ClientContext* const context_;
841   grpc::internal::Call call_;
842   ClientReadReactor<Response>* const reactor_;
843 
844   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
845                             grpc::internal::CallOpSendMessage,
846                             grpc::internal::CallOpClientSendClose,
847                             grpc::internal::CallOpRecvInitialMetadata>
848       start_ops_;
849   grpc::internal::CallbackWithSuccessTag start_tag_;
850 
851   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
852   grpc::internal::CallbackWithSuccessTag finish_tag_;
853   ::grpc::Status finish_status_;
854 
855   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
856       read_ops_;
857   grpc::internal::CallbackWithSuccessTag read_tag_;
858 
859   struct StartCallBacklog {
860     bool read_ops = false;
861   };
862   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
863 
864   // Minimum of 2 callbacks to pre-register for start and finish
865   std::atomic<intptr_t> callbacks_outstanding_{2};
866   std::atomic_bool started_{false};
867   grpc::internal::Mutex start_mu_;
868 };
869 
870 template <class Response>
871 class ClientCallbackReaderFactory {
872  public:
873   template <class Request>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)874   static void Create(::grpc::ChannelInterface* channel,
875                      const ::grpc::internal::RpcMethod& method,
876                      ::grpc::ClientContext* context, const Request* request,
877                      ClientReadReactor<Response>* reactor) {
878     grpc::internal::Call call =
879         channel->CreateCall(method, context, channel->CallbackCQ());
880 
881     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
882     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
883         call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
884         ClientCallbackReaderImpl<Response>(call, context, request, reactor);
885   }
886 };
887 
888 template <class Request>
889 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
890  public:
891   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)892   static void operator delete(void* /*ptr*/, std::size_t size) {
893     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
894   }
895 
896   // This operator should never be called as the memory should be freed as part
897   // of the arena destruction. It only exists to provide a matching operator
898   // delete to the operator new so that some compilers will not complain (see
899   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
900   // there are no tests catching the compiler warning.
delete(void *,void *)901   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
902 
StartCall()903   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
904     // This call initiates two batches, plus any backlog, each with a callback
905     // 1. Send initial metadata (unless corked) + recv initial metadata
906     // 2. Any backlog
907     // 3. Recv trailing metadata
908 
909     if (!start_corked_) {
910       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
911                                      context_->initial_metadata_flags());
912     }
913     call_.PerformOps(&start_ops_);
914 
915     {
916       grpc::internal::MutexLock lock(&start_mu_);
917 
918       if (backlog_.write_ops) {
919         call_.PerformOps(&write_ops_);
920       }
921       if (backlog_.writes_done_ops) {
922         call_.PerformOps(&writes_done_ops_);
923       }
924       call_.PerformOps(&finish_ops_);
925       // The last thing in this critical section is to set started_ so that it
926       // can be used lock-free as well.
927       started_.store(true, std::memory_order_release);
928     }
929     // MaybeFinish outside the lock to make sure that destruction of this object
930     // doesn't take place while holding the lock (which would cause the lock to
931     // be released after destruction)
932     this->MaybeFinish(/*from_reaction=*/false);
933   }
934 
Write(const Request * msg,::grpc::WriteOptions options)935   void Write(const Request* msg, ::grpc::WriteOptions options)
936       ABSL_LOCKS_EXCLUDED(start_mu_) override {
937     if (GPR_UNLIKELY(options.is_last_message())) {
938       options.set_buffer_hint();
939       write_ops_.ClientSendClose();
940     }
941     // TODO(vjpai): don't assert
942     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
943     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
944 
945     if (GPR_UNLIKELY(corked_write_needed_)) {
946       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
947                                      context_->initial_metadata_flags());
948       corked_write_needed_ = false;
949     }
950 
951     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
952       grpc::internal::MutexLock lock(&start_mu_);
953       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
954         backlog_.write_ops = true;
955         return;
956       }
957     }
958     call_.PerformOps(&write_ops_);
959   }
960 
WritesDone()961   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
962     writes_done_ops_.ClientSendClose();
963     writes_done_tag_.Set(
964         call_.call(),
965         [this](bool ok) {
966           reactor_->OnWritesDoneDone(ok);
967           MaybeFinish(/*from_reaction=*/true);
968         },
969         &writes_done_ops_, /*can_inline=*/false);
970     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
971     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
972 
973     if (GPR_UNLIKELY(corked_write_needed_)) {
974       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
975                                            context_->initial_metadata_flags());
976       corked_write_needed_ = false;
977     }
978 
979     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
980       grpc::internal::MutexLock lock(&start_mu_);
981       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
982         backlog_.writes_done_ops = true;
983         return;
984       }
985     }
986     call_.PerformOps(&writes_done_ops_);
987   }
988 
AddHold(int holds)989   void AddHold(int holds) override {
990     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
991   }
RemoveHold()992   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
993 
994  private:
995   friend class ClientCallbackWriterFactory<Request>;
996 
997   template <class Response>
ClientCallbackWriterImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)998   ClientCallbackWriterImpl(::grpc::internal::Call call,
999                            ::grpc::ClientContext* context, Response* response,
1000                            ClientWriteReactor<Request>* reactor)
1001       : context_(context),
1002         call_(call),
1003         reactor_(reactor),
1004         start_corked_(context_->initial_metadata_corked_),
1005         corked_write_needed_(start_corked_) {
1006     this->BindReactor(reactor);
1007 
1008     // Set up the unchanging parts of the start and write tags and ops.
1009     start_tag_.Set(
1010         call_.call(),
1011         [this](bool ok) {
1012           reactor_->OnReadInitialMetadataDone(
1013               ok && !reactor_->InternalTrailersOnly(call_.call()));
1014           MaybeFinish(/*from_reaction=*/true);
1015         },
1016         &start_ops_, /*can_inline=*/false);
1017     start_ops_.RecvInitialMetadata(context_);
1018     start_ops_.set_core_cq_tag(&start_tag_);
1019 
1020     write_tag_.Set(
1021         call_.call(),
1022         [this](bool ok) {
1023           reactor_->OnWriteDone(ok);
1024           MaybeFinish(/*from_reaction=*/true);
1025         },
1026         &write_ops_, /*can_inline=*/false);
1027     write_ops_.set_core_cq_tag(&write_tag_);
1028 
1029     // Also set up the Finish tag and op set.
1030     finish_ops_.RecvMessage(response);
1031     finish_ops_.AllowNoMessage();
1032     finish_tag_.Set(
1033         call_.call(),
1034         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1035         &finish_ops_,
1036         /*can_inline=*/false);
1037     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1038     finish_ops_.set_core_cq_tag(&finish_tag_);
1039   }
1040 
1041   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1042   void MaybeFinish(bool from_reaction) {
1043     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1044                          1, std::memory_order_acq_rel) == 1)) {
1045       ::grpc::Status s = std::move(finish_status_);
1046       auto* reactor = reactor_;
1047       auto* call = call_.call();
1048       this->~ClientCallbackWriterImpl();
1049       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1050       if (GPR_LIKELY(from_reaction)) {
1051         reactor->OnDone(s);
1052       } else {
1053         reactor->InternalScheduleOnDone(std::move(s));
1054       }
1055     }
1056   }
1057 
1058   ::grpc::ClientContext* const context_;
1059   grpc::internal::Call call_;
1060   ClientWriteReactor<Request>* const reactor_;
1061 
1062   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1063                             grpc::internal::CallOpRecvInitialMetadata>
1064       start_ops_;
1065   grpc::internal::CallbackWithSuccessTag start_tag_;
1066   const bool start_corked_;
1067   bool corked_write_needed_;  // no lock needed since only accessed in
1068                               // Write/WritesDone which cannot be concurrent
1069 
1070   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1071                             grpc::internal::CallOpClientRecvStatus>
1072       finish_ops_;
1073   grpc::internal::CallbackWithSuccessTag finish_tag_;
1074   ::grpc::Status finish_status_;
1075 
1076   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1077                             grpc::internal::CallOpSendMessage,
1078                             grpc::internal::CallOpClientSendClose>
1079       write_ops_;
1080   grpc::internal::CallbackWithSuccessTag write_tag_;
1081 
1082   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1083                             grpc::internal::CallOpClientSendClose>
1084       writes_done_ops_;
1085   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1086 
1087   struct StartCallBacklog {
1088     bool write_ops = false;
1089     bool writes_done_ops = false;
1090   };
1091   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1092 
1093   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1094   std::atomic<intptr_t> callbacks_outstanding_{3};
1095   std::atomic_bool started_{false};
1096   grpc::internal::Mutex start_mu_;
1097 };
1098 
1099 template <class Request>
1100 class ClientCallbackWriterFactory {
1101  public:
1102   template <class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1103   static void Create(::grpc::ChannelInterface* channel,
1104                      const ::grpc::internal::RpcMethod& method,
1105                      ::grpc::ClientContext* context, Response* response,
1106                      ClientWriteReactor<Request>* reactor) {
1107     grpc::internal::Call call =
1108         channel->CreateCall(method, context, channel->CallbackCQ());
1109 
1110     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1111     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1112         call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1113         ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1114   }
1115 };
1116 
1117 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1118  public:
1119   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1120   static void operator delete(void* /*ptr*/, std::size_t size) {
1121     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1122   }
1123 
1124   // This operator should never be called as the memory should be freed as part
1125   // of the arena destruction. It only exists to provide a matching operator
1126   // delete to the operator new so that some compilers will not complain (see
1127   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1128   // there are no tests catching the compiler warning.
delete(void *,void *)1129   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1130 
StartCall()1131   void StartCall() override {
1132     // This call initiates two batches, each with a callback
1133     // 1. Send initial metadata + write + writes done + recv initial metadata
1134     // 2. Read message, recv trailing metadata
1135 
1136     start_tag_.Set(
1137         call_.call(),
1138         [this](bool ok) {
1139           reactor_->OnReadInitialMetadataDone(
1140               ok && !reactor_->InternalTrailersOnly(call_.call()));
1141           MaybeFinish();
1142         },
1143         &start_ops_, /*can_inline=*/false);
1144     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1145                                    context_->initial_metadata_flags());
1146     start_ops_.RecvInitialMetadata(context_);
1147     start_ops_.set_core_cq_tag(&start_tag_);
1148     call_.PerformOps(&start_ops_);
1149 
1150     finish_tag_.Set(
1151         call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1152         /*can_inline=*/false);
1153     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1154     finish_ops_.set_core_cq_tag(&finish_tag_);
1155     call_.PerformOps(&finish_ops_);
1156   }
1157 
1158  private:
1159   friend class ClientCallbackUnaryFactory;
1160 
1161   template <class Request, class Response>
ClientCallbackUnaryImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1162   ClientCallbackUnaryImpl(::grpc::internal::Call call,
1163                           ::grpc::ClientContext* context, Request* request,
1164                           Response* response, ClientUnaryReactor* reactor)
1165       : context_(context), call_(call), reactor_(reactor) {
1166     this->BindReactor(reactor);
1167     // TODO(vjpai): don't assert
1168     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1169     start_ops_.ClientSendClose();
1170     finish_ops_.RecvMessage(response);
1171     finish_ops_.AllowNoMessage();
1172   }
1173 
1174   // In the unary case, MaybeFinish is only ever invoked from a
1175   // library-initiated reaction, so it will just directly call OnDone if this is
1176   // the last reaction for this RPC.
MaybeFinish()1177   void MaybeFinish() {
1178     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1179                          1, std::memory_order_acq_rel) == 1)) {
1180       ::grpc::Status s = std::move(finish_status_);
1181       auto* reactor = reactor_;
1182       auto* call = call_.call();
1183       this->~ClientCallbackUnaryImpl();
1184       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1185       reactor->OnDone(s);
1186     }
1187   }
1188 
1189   ::grpc::ClientContext* const context_;
1190   grpc::internal::Call call_;
1191   ClientUnaryReactor* const reactor_;
1192 
1193   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1194                             grpc::internal::CallOpSendMessage,
1195                             grpc::internal::CallOpClientSendClose,
1196                             grpc::internal::CallOpRecvInitialMetadata>
1197       start_ops_;
1198   grpc::internal::CallbackWithSuccessTag start_tag_;
1199 
1200   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1201                             grpc::internal::CallOpClientRecvStatus>
1202       finish_ops_;
1203   grpc::internal::CallbackWithSuccessTag finish_tag_;
1204   ::grpc::Status finish_status_;
1205 
1206   // This call will have 2 callbacks: start and finish
1207   std::atomic<intptr_t> callbacks_outstanding_{2};
1208 };
1209 
1210 class ClientCallbackUnaryFactory {
1211  public:
1212   template <class Request, class Response, class BaseRequest = Request,
1213             class BaseResponse = Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1214   static void Create(::grpc::ChannelInterface* channel,
1215                      const ::grpc::internal::RpcMethod& method,
1216                      ::grpc::ClientContext* context, const Request* request,
1217                      Response* response, ClientUnaryReactor* reactor) {
1218     grpc::internal::Call call =
1219         channel->CreateCall(method, context, channel->CallbackCQ());
1220 
1221     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1222 
1223     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1224         call.call(), sizeof(ClientCallbackUnaryImpl)))
1225         ClientCallbackUnaryImpl(call, context,
1226                                 static_cast<const BaseRequest*>(request),
1227                                 static_cast<BaseResponse*>(response), reactor);
1228   }
1229 };
1230 
1231 }  // namespace internal
1232 }  // namespace grpc
1233 #endif  // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
1234