1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20
21 // IWYU pragma: private, include <grpcpp/support/client_callback.h>
22
23 #include <atomic>
24 #include <functional>
25
26 #include <grpcpp/impl/codegen/call.h>
27 #include <grpcpp/impl/codegen/call_op_set.h>
28 #include <grpcpp/impl/codegen/callback_common.h>
29 #include <grpcpp/impl/codegen/channel_interface.h>
30 #include <grpcpp/impl/codegen/config.h>
31 #include <grpcpp/impl/codegen/core_codegen_interface.h>
32 #include <grpcpp/impl/codegen/status.h>
33 #include <grpcpp/impl/codegen/sync.h>
34
35 namespace grpc {
36 class Channel;
37 class ClientContext;
38
39 namespace internal {
40 class RpcMethod;
41
42 /// Perform a callback-based unary call. May optionally specify the base
43 /// class of the Request and Response so that the internal calls and structures
44 /// below this may be based on those base classes and thus achieve code reuse
45 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
46 /// class).
47 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
48 template <class InputMessage, class OutputMessage,
49 class BaseInputMessage = InputMessage,
50 class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)51 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
52 const ::grpc::internal::RpcMethod& method,
53 ::grpc::ClientContext* context,
54 const InputMessage* request, OutputMessage* result,
55 std::function<void(::grpc::Status)> on_completion) {
56 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
57 "Invalid input message specification");
58 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
59 "Invalid output message specification");
60 CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
61 channel, method, context, request, result, on_completion);
62 }
63
64 template <class InputMessage, class OutputMessage>
65 class CallbackUnaryCallImpl {
66 public:
CallbackUnaryCallImpl(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)67 CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
68 const ::grpc::internal::RpcMethod& method,
69 ::grpc::ClientContext* context,
70 const InputMessage* request, OutputMessage* result,
71 std::function<void(::grpc::Status)> on_completion) {
72 ::grpc::CompletionQueue* cq = channel->CallbackCQ();
73 GPR_CODEGEN_ASSERT(cq != nullptr);
74 grpc::internal::Call call(channel->CreateCall(method, context, cq));
75
76 using FullCallOpSet = grpc::internal::CallOpSet<
77 ::grpc::internal::CallOpSendInitialMetadata,
78 grpc::internal::CallOpSendMessage,
79 grpc::internal::CallOpRecvInitialMetadata,
80 grpc::internal::CallOpRecvMessage<OutputMessage>,
81 grpc::internal::CallOpClientSendClose,
82 grpc::internal::CallOpClientRecvStatus>;
83
84 struct OpSetAndTag {
85 FullCallOpSet opset;
86 grpc::internal::CallbackWithStatusTag tag;
87 };
88 const size_t alloc_sz = sizeof(OpSetAndTag);
89 auto* const alloced = static_cast<OpSetAndTag*>(
90 ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
91 alloc_sz));
92 auto* ops = new (&alloced->opset) FullCallOpSet;
93 auto* tag = new (&alloced->tag)
94 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
95
96 // TODO(vjpai): Unify code with sync API as much as possible
97 ::grpc::Status s = ops->SendMessagePtr(request);
98 if (!s.ok()) {
99 tag->force_run(s);
100 return;
101 }
102 ops->SendInitialMetadata(&context->send_initial_metadata_,
103 context->initial_metadata_flags());
104 ops->RecvInitialMetadata(context);
105 ops->RecvMessage(result);
106 ops->AllowNoMessage();
107 ops->ClientSendClose();
108 ops->ClientRecvStatus(context, tag->status_ptr());
109 ops->set_core_cq_tag(tag);
110 call.PerformOps(ops);
111 }
112 };
113
114 // Base class for public API classes.
115 class ClientReactor {
116 public:
117 virtual ~ClientReactor() = default;
118
119 /// Called by the library when all operations associated with this RPC have
120 /// completed and all Holds have been removed. OnDone provides the RPC status
121 /// outcome for both successful and failed RPCs. If it is never called on an
122 /// RPC, it indicates an application-level problem (like failure to remove a
123 /// hold).
124 ///
125 /// \param[in] s The status outcome of this RPC
126 virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
127
128 /// InternalScheduleOnDone is not part of the API and is not meant to be
129 /// overridden. It is virtual to allow successful builds for certain bazel
130 /// build users that only want to depend on gRPC codegen headers and not the
131 /// full library (although this is not a generally-supported option). Although
132 /// the virtual call is slower than a direct call, this function is
133 /// heavyweight and the cost of the virtual call is not much in comparison.
134 /// This function may be removed or devirtualized in the future.
135 virtual void InternalScheduleOnDone(::grpc::Status s);
136
137 /// InternalTrailersOnly is not part of the API and is not meant to be
138 /// overridden. It is virtual to allow successful builds for certain bazel
139 /// build users that only want to depend on gRPC codegen headers and not the
140 /// full library (although this is not a generally-supported option). Although
141 /// the virtual call is slower than a direct call, this function is
142 /// heavyweight and the cost of the virtual call is not much in comparison.
143 /// This function may be removed or devirtualized in the future.
144 virtual bool InternalTrailersOnly(const grpc_call* call) const;
145 };
146
147 } // namespace internal
148
149 // Forward declarations
150 template <class Request, class Response>
151 class ClientBidiReactor;
152 template <class Response>
153 class ClientReadReactor;
154 template <class Request>
155 class ClientWriteReactor;
156 class ClientUnaryReactor;
157
158 // NOTE: The streaming objects are not actually implemented in the public API.
159 // These interfaces are provided for mocking only. Typical applications
160 // will interact exclusively with the reactors that they define.
161 template <class Request, class Response>
162 class ClientCallbackReaderWriter {
163 public:
~ClientCallbackReaderWriter()164 virtual ~ClientCallbackReaderWriter() {}
165 virtual void StartCall() = 0;
166 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
167 virtual void WritesDone() = 0;
168 virtual void Read(Response* resp) = 0;
169 virtual void AddHold(int holds) = 0;
170 virtual void RemoveHold() = 0;
171
172 protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)173 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
174 reactor->BindStream(this);
175 }
176 };
177
178 template <class Response>
179 class ClientCallbackReader {
180 public:
~ClientCallbackReader()181 virtual ~ClientCallbackReader() {}
182 virtual void StartCall() = 0;
183 virtual void Read(Response* resp) = 0;
184 virtual void AddHold(int holds) = 0;
185 virtual void RemoveHold() = 0;
186
187 protected:
BindReactor(ClientReadReactor<Response> * reactor)188 void BindReactor(ClientReadReactor<Response>* reactor) {
189 reactor->BindReader(this);
190 }
191 };
192
193 template <class Request>
194 class ClientCallbackWriter {
195 public:
~ClientCallbackWriter()196 virtual ~ClientCallbackWriter() {}
197 virtual void StartCall() = 0;
Write(const Request * req)198 void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
199 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
WriteLast(const Request * req,::grpc::WriteOptions options)200 void WriteLast(const Request* req, ::grpc::WriteOptions options) {
201 Write(req, options.set_last_message());
202 }
203 virtual void WritesDone() = 0;
204
205 virtual void AddHold(int holds) = 0;
206 virtual void RemoveHold() = 0;
207
208 protected:
BindReactor(ClientWriteReactor<Request> * reactor)209 void BindReactor(ClientWriteReactor<Request>* reactor) {
210 reactor->BindWriter(this);
211 }
212 };
213
214 class ClientCallbackUnary {
215 public:
~ClientCallbackUnary()216 virtual ~ClientCallbackUnary() {}
217 virtual void StartCall() = 0;
218
219 protected:
220 void BindReactor(ClientUnaryReactor* reactor);
221 };
222
223 // The following classes are the reactor interfaces that are to be implemented
224 // by the user. They are passed in to the library as an argument to a call on a
225 // stub (either a codegen-ed call or a generic call). The streaming RPC is
226 // activated by calling StartCall, possibly after initiating StartRead,
227 // StartWrite, or AddHold operations on the streaming object. Note that none of
228 // the classes are pure; all reactions have a default empty reaction so that the
229 // user class only needs to override those reactions that it cares about.
230 // The reactor must be passed to the stub invocation before any of the below
231 // operations can be called and its reactions will be invoked by the library in
232 // response to the completion of various operations. Reactions must not include
233 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
234 // waiting on condition variables). Reactions may be invoked concurrently,
235 // except that OnDone is called after all others (assuming proper API usage).
236 // The reactor may not be deleted until OnDone is called.
237
238 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
239 template <class Request, class Response>
240 class ClientBidiReactor : public internal::ClientReactor {
241 public:
242 /// Activate the RPC and initiate any reads or writes that have been Start'ed
243 /// before this call. All streaming RPCs issued by the client MUST have
244 /// StartCall invoked on them (even if they are canceled) as this call is the
245 /// activation of their lifecycle.
StartCall()246 void StartCall() { stream_->StartCall(); }
247
248 /// Initiate a read operation (or post it for later initiation if StartCall
249 /// has not yet been invoked).
250 ///
251 /// \param[out] resp Where to eventually store the read message. Valid when
252 /// the library calls OnReadDone
StartRead(Response * resp)253 void StartRead(Response* resp) { stream_->Read(resp); }
254
255 /// Initiate a write operation (or post it for later initiation if StartCall
256 /// has not yet been invoked).
257 ///
258 /// \param[in] req The message to be written. The library does not take
259 /// ownership but the caller must ensure that the message is
260 /// not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)261 void StartWrite(const Request* req) {
262 StartWrite(req, ::grpc::WriteOptions());
263 }
264
265 /// Initiate/post a write operation with specified options.
266 ///
267 /// \param[in] req The message to be written. The library does not take
268 /// ownership but the caller must ensure that the message is
269 /// not deleted or modified until OnWriteDone is called.
270 /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,::grpc::WriteOptions options)271 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
272 stream_->Write(req, options);
273 }
274
275 /// Initiate/post a write operation with specified options and an indication
276 /// that this is the last write (like StartWrite and StartWritesDone, merged).
277 /// Note that calling this means that no more calls to StartWrite,
278 /// StartWriteLast, or StartWritesDone are allowed.
279 ///
280 /// \param[in] req The message to be written. The library does not take
281 /// ownership but the caller must ensure that the message is
282 /// not deleted or modified until OnWriteDone is called.
283 /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,::grpc::WriteOptions options)284 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
285 StartWrite(req, options.set_last_message());
286 }
287
288 /// Indicate that the RPC will have no more write operations. This can only be
289 /// issued once for a given RPC. This is not required or allowed if
290 /// StartWriteLast is used since that already has the same implication.
291 /// Note that calling this means that no more calls to StartWrite,
292 /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()293 void StartWritesDone() { stream_->WritesDone(); }
294
295 /// Holds are needed if (and only if) this stream has operations that take
296 /// place on it after StartCall but from outside one of the reactions
297 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
298 ///
299 /// Holds must be added before calling StartCall. If a stream still has a hold
300 /// in place, its resources will not be destroyed even if the status has
301 /// already come in from the wire and there are currently no active callbacks
302 /// outstanding. Similarly, the stream will not call OnDone if there are still
303 /// holds on it.
304 ///
305 /// For example, if a StartRead or StartWrite operation is going to be
306 /// initiated from elsewhere in the application, the application should call
307 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
308 /// for example, a read-flow and a write-flow taking place outside the
309 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
310 /// application knows that it won't issue any more read operations (such as
311 /// when a read comes back as not ok), it should issue a RemoveHold(). It
312 /// should also call RemoveHold() again after it does StartWriteLast or
313 /// StartWritesDone that indicates that there will be no more write ops.
314 /// The number of RemoveHold calls must match the total number of AddHold
315 /// calls plus the number of holds added by AddMultipleHolds.
316 /// The argument to AddMultipleHolds must be positive.
AddHold()317 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)318 void AddMultipleHolds(int holds) {
319 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
320 stream_->AddHold(holds);
321 }
RemoveHold()322 void RemoveHold() { stream_->RemoveHold(); }
323
324 /// Notifies the application that all operations associated with this RPC
325 /// have completed and all Holds have been removed. OnDone provides the RPC
326 /// status outcome for both successful and failed RPCs and will be called in
327 /// all cases. If it is not called, it indicates an application-level problem
328 /// (like failure to remove a hold).
329 ///
330 /// \param[in] s The status outcome of this RPC
OnDone(const::grpc::Status &)331 void OnDone(const ::grpc::Status& /*s*/) override {}
332
333 /// Notifies the application that a read of initial metadata from the
334 /// server is done. If the application chooses not to implement this method,
335 /// it can assume that the initial metadata has been read before the first
336 /// call of OnReadDone or OnDone.
337 ///
338 /// \param[in] ok Was the initial metadata read successfully? If false, no
339 /// new read/write operation will succeed, and any further
340 /// Start* operations should not be called.
OnReadInitialMetadataDone(bool)341 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
342
343 /// Notifies the application that a StartRead operation completed.
344 ///
345 /// \param[in] ok Was it successful? If false, no new read/write operation
346 /// will succeed, and any further Start* should not be called.
OnReadDone(bool)347 virtual void OnReadDone(bool /*ok*/) {}
348
349 /// Notifies the application that a StartWrite or StartWriteLast operation
350 /// completed.
351 ///
352 /// \param[in] ok Was it successful? If false, no new read/write operation
353 /// will succeed, and any further Start* should not be called.
OnWriteDone(bool)354 virtual void OnWriteDone(bool /*ok*/) {}
355
356 /// Notifies the application that a StartWritesDone operation completed. Note
357 /// that this is only used on explicit StartWritesDone operations and not for
358 /// those that are implicitly invoked as part of a StartWriteLast.
359 ///
360 /// \param[in] ok Was it successful? If false, the application will later see
361 /// the failure reflected as a bad status in OnDone and no
362 /// further Start* should be called.
OnWritesDoneDone(bool)363 virtual void OnWritesDoneDone(bool /*ok*/) {}
364
365 private:
366 friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)367 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
368 stream_ = stream;
369 }
370 ClientCallbackReaderWriter<Request, Response>* stream_;
371 };
372
373 /// \a ClientReadReactor is the interface for a server-streaming RPC.
374 /// All public methods behave as in ClientBidiReactor.
375 template <class Response>
376 class ClientReadReactor : public internal::ClientReactor {
377 public:
StartCall()378 void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)379 void StartRead(Response* resp) { reader_->Read(resp); }
380
AddHold()381 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)382 void AddMultipleHolds(int holds) {
383 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
384 reader_->AddHold(holds);
385 }
RemoveHold()386 void RemoveHold() { reader_->RemoveHold(); }
387
OnDone(const::grpc::Status &)388 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)389 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)390 virtual void OnReadDone(bool /*ok*/) {}
391
392 private:
393 friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)394 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
395 ClientCallbackReader<Response>* reader_;
396 };
397
398 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
399 /// All public methods behave as in ClientBidiReactor.
400 template <class Request>
401 class ClientWriteReactor : public internal::ClientReactor {
402 public:
StartCall()403 void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)404 void StartWrite(const Request* req) {
405 StartWrite(req, ::grpc::WriteOptions());
406 }
StartWrite(const Request * req,::grpc::WriteOptions options)407 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
408 writer_->Write(req, options);
409 }
StartWriteLast(const Request * req,::grpc::WriteOptions options)410 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
411 StartWrite(req, options.set_last_message());
412 }
StartWritesDone()413 void StartWritesDone() { writer_->WritesDone(); }
414
AddHold()415 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)416 void AddMultipleHolds(int holds) {
417 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
418 writer_->AddHold(holds);
419 }
RemoveHold()420 void RemoveHold() { writer_->RemoveHold(); }
421
OnDone(const::grpc::Status &)422 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)423 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)424 virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)425 virtual void OnWritesDoneDone(bool /*ok*/) {}
426
427 private:
428 friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)429 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
430
431 ClientCallbackWriter<Request>* writer_;
432 };
433
434 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
435 /// This is _not_ a common way of invoking a unary RPC. In practice, this
436 /// option should be used only if the unary RPC wants to receive initial
437 /// metadata without waiting for the response to complete. Most deployments of
438 /// RPC systems do not use this option, but it is needed for generality.
439 /// All public methods behave as in ClientBidiReactor.
440 /// StartCall is included for consistency with the other reactor flavors: even
441 /// though there are no StartRead or StartWrite operations to queue before the
442 /// call (that is part of the unary call itself) and there is no reactor object
443 /// being created as a result of this call, we keep a consistent 2-phase
444 /// initiation API among all the reactor flavors.
445 class ClientUnaryReactor : public internal::ClientReactor {
446 public:
StartCall()447 void StartCall() { call_->StartCall(); }
OnDone(const::grpc::Status &)448 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)449 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
450
451 private:
452 friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)453 void BindCall(ClientCallbackUnary* call) { call_ = call; }
454 ClientCallbackUnary* call_;
455 };
456
457 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)458 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
459 reactor->BindCall(this);
460 }
461
462 namespace internal {
463
464 // Forward declare factory classes for friendship
465 template <class Request, class Response>
466 class ClientCallbackReaderWriterFactory;
467 template <class Response>
468 class ClientCallbackReaderFactory;
469 template <class Request>
470 class ClientCallbackWriterFactory;
471
472 template <class Request, class Response>
473 class ClientCallbackReaderWriterImpl
474 : public ClientCallbackReaderWriter<Request, Response> {
475 public:
476 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)477 static void operator delete(void* /*ptr*/, std::size_t size) {
478 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
479 }
480
481 // This operator should never be called as the memory should be freed as part
482 // of the arena destruction. It only exists to provide a matching operator
483 // delete to the operator new so that some compilers will not complain (see
484 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
485 // there are no tests catching the compiler warning.
delete(void *,void *)486 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
487
StartCall()488 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
489 // This call initiates two batches, plus any backlog, each with a callback
490 // 1. Send initial metadata (unless corked) + recv initial metadata
491 // 2. Any read backlog
492 // 3. Any write backlog
493 // 4. Recv trailing metadata (unless corked)
494 if (!start_corked_) {
495 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
496 context_->initial_metadata_flags());
497 }
498
499 call_.PerformOps(&start_ops_);
500
501 {
502 grpc::internal::MutexLock lock(&start_mu_);
503
504 if (backlog_.read_ops) {
505 call_.PerformOps(&read_ops_);
506 }
507 if (backlog_.write_ops) {
508 call_.PerformOps(&write_ops_);
509 }
510 if (backlog_.writes_done_ops) {
511 call_.PerformOps(&writes_done_ops_);
512 }
513 call_.PerformOps(&finish_ops_);
514 // The last thing in this critical section is to set started_ so that it
515 // can be used lock-free as well.
516 started_.store(true, std::memory_order_release);
517 }
518 // MaybeFinish outside the lock to make sure that destruction of this object
519 // doesn't take place while holding the lock (which would cause the lock to
520 // be released after destruction)
521 this->MaybeFinish(/*from_reaction=*/false);
522 }
523
Read(Response * msg)524 void Read(Response* msg) override {
525 read_ops_.RecvMessage(msg);
526 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
527 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
528 grpc::internal::MutexLock lock(&start_mu_);
529 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
530 backlog_.read_ops = true;
531 return;
532 }
533 }
534 call_.PerformOps(&read_ops_);
535 }
536
Write(const Request * msg,::grpc::WriteOptions options)537 void Write(const Request* msg, ::grpc::WriteOptions options)
538 ABSL_LOCKS_EXCLUDED(start_mu_) override {
539 if (options.is_last_message()) {
540 options.set_buffer_hint();
541 write_ops_.ClientSendClose();
542 }
543 // TODO(vjpai): don't assert
544 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
545 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
546 if (GPR_UNLIKELY(corked_write_needed_)) {
547 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
548 context_->initial_metadata_flags());
549 corked_write_needed_ = false;
550 }
551
552 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
553 grpc::internal::MutexLock lock(&start_mu_);
554 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
555 backlog_.write_ops = true;
556 return;
557 }
558 }
559 call_.PerformOps(&write_ops_);
560 }
WritesDone()561 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
562 writes_done_ops_.ClientSendClose();
563 writes_done_tag_.Set(
564 call_.call(),
565 [this](bool ok) {
566 reactor_->OnWritesDoneDone(ok);
567 MaybeFinish(/*from_reaction=*/true);
568 },
569 &writes_done_ops_, /*can_inline=*/false);
570 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
571 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
572 if (GPR_UNLIKELY(corked_write_needed_)) {
573 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
574 context_->initial_metadata_flags());
575 corked_write_needed_ = false;
576 }
577 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
578 grpc::internal::MutexLock lock(&start_mu_);
579 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
580 backlog_.writes_done_ops = true;
581 return;
582 }
583 }
584 call_.PerformOps(&writes_done_ops_);
585 }
586
AddHold(int holds)587 void AddHold(int holds) override {
588 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
589 }
RemoveHold()590 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
591
592 private:
593 friend class ClientCallbackReaderWriterFactory<Request, Response>;
594
ClientCallbackReaderWriterImpl(grpc::internal::Call call,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)595 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
596 ::grpc::ClientContext* context,
597 ClientBidiReactor<Request, Response>* reactor)
598 : context_(context),
599 call_(call),
600 reactor_(reactor),
601 start_corked_(context_->initial_metadata_corked_),
602 corked_write_needed_(start_corked_) {
603 this->BindReactor(reactor);
604
605 // Set up the unchanging parts of the start, read, and write tags and ops.
606 start_tag_.Set(
607 call_.call(),
608 [this](bool ok) {
609 reactor_->OnReadInitialMetadataDone(
610 ok && !reactor_->InternalTrailersOnly(call_.call()));
611 MaybeFinish(/*from_reaction=*/true);
612 },
613 &start_ops_, /*can_inline=*/false);
614 start_ops_.RecvInitialMetadata(context_);
615 start_ops_.set_core_cq_tag(&start_tag_);
616
617 write_tag_.Set(
618 call_.call(),
619 [this](bool ok) {
620 reactor_->OnWriteDone(ok);
621 MaybeFinish(/*from_reaction=*/true);
622 },
623 &write_ops_, /*can_inline=*/false);
624 write_ops_.set_core_cq_tag(&write_tag_);
625
626 read_tag_.Set(
627 call_.call(),
628 [this](bool ok) {
629 reactor_->OnReadDone(ok);
630 MaybeFinish(/*from_reaction=*/true);
631 },
632 &read_ops_, /*can_inline=*/false);
633 read_ops_.set_core_cq_tag(&read_tag_);
634
635 // Also set up the Finish tag and op set.
636 finish_tag_.Set(
637 call_.call(),
638 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
639 &finish_ops_,
640 /*can_inline=*/false);
641 finish_ops_.ClientRecvStatus(context_, &finish_status_);
642 finish_ops_.set_core_cq_tag(&finish_tag_);
643 }
644
645 // MaybeFinish can be called from reactions or from user-initiated operations
646 // like StartCall or RemoveHold. If this is the last operation or hold on this
647 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
648 // a reaction, it can call OnDone directly. If not, it would need to schedule
649 // OnDone onto an executor thread to avoid the possibility of deadlocking with
650 // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)651 void MaybeFinish(bool from_reaction) {
652 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
653 1, std::memory_order_acq_rel) == 1)) {
654 ::grpc::Status s = std::move(finish_status_);
655 auto* reactor = reactor_;
656 auto* call = call_.call();
657 this->~ClientCallbackReaderWriterImpl();
658 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
659 if (GPR_LIKELY(from_reaction)) {
660 reactor->OnDone(s);
661 } else {
662 reactor->InternalScheduleOnDone(std::move(s));
663 }
664 }
665 }
666
667 ::grpc::ClientContext* const context_;
668 grpc::internal::Call call_;
669 ClientBidiReactor<Request, Response>* const reactor_;
670
671 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
672 grpc::internal::CallOpRecvInitialMetadata>
673 start_ops_;
674 grpc::internal::CallbackWithSuccessTag start_tag_;
675 const bool start_corked_;
676 bool corked_write_needed_; // no lock needed since only accessed in
677 // Write/WritesDone which cannot be concurrent
678
679 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
680 grpc::internal::CallbackWithSuccessTag finish_tag_;
681 ::grpc::Status finish_status_;
682
683 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
684 grpc::internal::CallOpSendMessage,
685 grpc::internal::CallOpClientSendClose>
686 write_ops_;
687 grpc::internal::CallbackWithSuccessTag write_tag_;
688
689 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
690 grpc::internal::CallOpClientSendClose>
691 writes_done_ops_;
692 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
693
694 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
695 read_ops_;
696 grpc::internal::CallbackWithSuccessTag read_tag_;
697
698 struct StartCallBacklog {
699 bool write_ops = false;
700 bool writes_done_ops = false;
701 bool read_ops = false;
702 };
703 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
704
705 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
706 std::atomic<intptr_t> callbacks_outstanding_{3};
707 std::atomic_bool started_{false};
708 grpc::internal::Mutex start_mu_;
709 };
710
711 template <class Request, class Response>
712 class ClientCallbackReaderWriterFactory {
713 public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)714 static void Create(::grpc::ChannelInterface* channel,
715 const ::grpc::internal::RpcMethod& method,
716 ::grpc::ClientContext* context,
717 ClientBidiReactor<Request, Response>* reactor) {
718 grpc::internal::Call call =
719 channel->CreateCall(method, context, channel->CallbackCQ());
720
721 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
722 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
723 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
724 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
725 reactor);
726 }
727 };
728
729 template <class Response>
730 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
731 public:
732 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)733 static void operator delete(void* /*ptr*/, std::size_t size) {
734 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
735 }
736
737 // This operator should never be called as the memory should be freed as part
738 // of the arena destruction. It only exists to provide a matching operator
739 // delete to the operator new so that some compilers will not complain (see
740 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
741 // there are no tests catching the compiler warning.
delete(void *,void *)742 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
743
StartCall()744 void StartCall() override {
745 // This call initiates two batches, plus any backlog, each with a callback
746 // 1. Send initial metadata (unless corked) + recv initial metadata
747 // 2. Any backlog
748 // 3. Recv trailing metadata
749
750 start_tag_.Set(
751 call_.call(),
752 [this](bool ok) {
753 reactor_->OnReadInitialMetadataDone(
754 ok && !reactor_->InternalTrailersOnly(call_.call()));
755 MaybeFinish(/*from_reaction=*/true);
756 },
757 &start_ops_, /*can_inline=*/false);
758 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
759 context_->initial_metadata_flags());
760 start_ops_.RecvInitialMetadata(context_);
761 start_ops_.set_core_cq_tag(&start_tag_);
762 call_.PerformOps(&start_ops_);
763
764 // Also set up the read tag so it doesn't have to be set up each time
765 read_tag_.Set(
766 call_.call(),
767 [this](bool ok) {
768 reactor_->OnReadDone(ok);
769 MaybeFinish(/*from_reaction=*/true);
770 },
771 &read_ops_, /*can_inline=*/false);
772 read_ops_.set_core_cq_tag(&read_tag_);
773
774 {
775 grpc::internal::MutexLock lock(&start_mu_);
776 if (backlog_.read_ops) {
777 call_.PerformOps(&read_ops_);
778 }
779 started_.store(true, std::memory_order_release);
780 }
781
782 finish_tag_.Set(
783 call_.call(),
784 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
785 &finish_ops_, /*can_inline=*/false);
786 finish_ops_.ClientRecvStatus(context_, &finish_status_);
787 finish_ops_.set_core_cq_tag(&finish_tag_);
788 call_.PerformOps(&finish_ops_);
789 }
790
Read(Response * msg)791 void Read(Response* msg) override {
792 read_ops_.RecvMessage(msg);
793 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
794 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
795 grpc::internal::MutexLock lock(&start_mu_);
796 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
797 backlog_.read_ops = true;
798 return;
799 }
800 }
801 call_.PerformOps(&read_ops_);
802 }
803
AddHold(int holds)804 void AddHold(int holds) override {
805 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
806 }
RemoveHold()807 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
808
809 private:
810 friend class ClientCallbackReaderFactory<Response>;
811
812 template <class Request>
ClientCallbackReaderImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)813 ClientCallbackReaderImpl(::grpc::internal::Call call,
814 ::grpc::ClientContext* context, Request* request,
815 ClientReadReactor<Response>* reactor)
816 : context_(context), call_(call), reactor_(reactor) {
817 this->BindReactor(reactor);
818 // TODO(vjpai): don't assert
819 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
820 start_ops_.ClientSendClose();
821 }
822
823 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)824 void MaybeFinish(bool from_reaction) {
825 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
826 1, std::memory_order_acq_rel) == 1)) {
827 ::grpc::Status s = std::move(finish_status_);
828 auto* reactor = reactor_;
829 auto* call = call_.call();
830 this->~ClientCallbackReaderImpl();
831 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
832 if (GPR_LIKELY(from_reaction)) {
833 reactor->OnDone(s);
834 } else {
835 reactor->InternalScheduleOnDone(std::move(s));
836 }
837 }
838 }
839
840 ::grpc::ClientContext* const context_;
841 grpc::internal::Call call_;
842 ClientReadReactor<Response>* const reactor_;
843
844 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
845 grpc::internal::CallOpSendMessage,
846 grpc::internal::CallOpClientSendClose,
847 grpc::internal::CallOpRecvInitialMetadata>
848 start_ops_;
849 grpc::internal::CallbackWithSuccessTag start_tag_;
850
851 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
852 grpc::internal::CallbackWithSuccessTag finish_tag_;
853 ::grpc::Status finish_status_;
854
855 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
856 read_ops_;
857 grpc::internal::CallbackWithSuccessTag read_tag_;
858
859 struct StartCallBacklog {
860 bool read_ops = false;
861 };
862 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
863
864 // Minimum of 2 callbacks to pre-register for start and finish
865 std::atomic<intptr_t> callbacks_outstanding_{2};
866 std::atomic_bool started_{false};
867 grpc::internal::Mutex start_mu_;
868 };
869
870 template <class Response>
871 class ClientCallbackReaderFactory {
872 public:
873 template <class Request>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)874 static void Create(::grpc::ChannelInterface* channel,
875 const ::grpc::internal::RpcMethod& method,
876 ::grpc::ClientContext* context, const Request* request,
877 ClientReadReactor<Response>* reactor) {
878 grpc::internal::Call call =
879 channel->CreateCall(method, context, channel->CallbackCQ());
880
881 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
882 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
883 call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
884 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
885 }
886 };
887
888 template <class Request>
889 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
890 public:
891 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)892 static void operator delete(void* /*ptr*/, std::size_t size) {
893 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
894 }
895
896 // This operator should never be called as the memory should be freed as part
897 // of the arena destruction. It only exists to provide a matching operator
898 // delete to the operator new so that some compilers will not complain (see
899 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
900 // there are no tests catching the compiler warning.
delete(void *,void *)901 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
902
StartCall()903 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
904 // This call initiates two batches, plus any backlog, each with a callback
905 // 1. Send initial metadata (unless corked) + recv initial metadata
906 // 2. Any backlog
907 // 3. Recv trailing metadata
908
909 if (!start_corked_) {
910 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
911 context_->initial_metadata_flags());
912 }
913 call_.PerformOps(&start_ops_);
914
915 {
916 grpc::internal::MutexLock lock(&start_mu_);
917
918 if (backlog_.write_ops) {
919 call_.PerformOps(&write_ops_);
920 }
921 if (backlog_.writes_done_ops) {
922 call_.PerformOps(&writes_done_ops_);
923 }
924 call_.PerformOps(&finish_ops_);
925 // The last thing in this critical section is to set started_ so that it
926 // can be used lock-free as well.
927 started_.store(true, std::memory_order_release);
928 }
929 // MaybeFinish outside the lock to make sure that destruction of this object
930 // doesn't take place while holding the lock (which would cause the lock to
931 // be released after destruction)
932 this->MaybeFinish(/*from_reaction=*/false);
933 }
934
Write(const Request * msg,::grpc::WriteOptions options)935 void Write(const Request* msg, ::grpc::WriteOptions options)
936 ABSL_LOCKS_EXCLUDED(start_mu_) override {
937 if (GPR_UNLIKELY(options.is_last_message())) {
938 options.set_buffer_hint();
939 write_ops_.ClientSendClose();
940 }
941 // TODO(vjpai): don't assert
942 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
943 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
944
945 if (GPR_UNLIKELY(corked_write_needed_)) {
946 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
947 context_->initial_metadata_flags());
948 corked_write_needed_ = false;
949 }
950
951 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
952 grpc::internal::MutexLock lock(&start_mu_);
953 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
954 backlog_.write_ops = true;
955 return;
956 }
957 }
958 call_.PerformOps(&write_ops_);
959 }
960
WritesDone()961 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
962 writes_done_ops_.ClientSendClose();
963 writes_done_tag_.Set(
964 call_.call(),
965 [this](bool ok) {
966 reactor_->OnWritesDoneDone(ok);
967 MaybeFinish(/*from_reaction=*/true);
968 },
969 &writes_done_ops_, /*can_inline=*/false);
970 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
971 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
972
973 if (GPR_UNLIKELY(corked_write_needed_)) {
974 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
975 context_->initial_metadata_flags());
976 corked_write_needed_ = false;
977 }
978
979 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
980 grpc::internal::MutexLock lock(&start_mu_);
981 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
982 backlog_.writes_done_ops = true;
983 return;
984 }
985 }
986 call_.PerformOps(&writes_done_ops_);
987 }
988
AddHold(int holds)989 void AddHold(int holds) override {
990 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
991 }
RemoveHold()992 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
993
994 private:
995 friend class ClientCallbackWriterFactory<Request>;
996
997 template <class Response>
ClientCallbackWriterImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)998 ClientCallbackWriterImpl(::grpc::internal::Call call,
999 ::grpc::ClientContext* context, Response* response,
1000 ClientWriteReactor<Request>* reactor)
1001 : context_(context),
1002 call_(call),
1003 reactor_(reactor),
1004 start_corked_(context_->initial_metadata_corked_),
1005 corked_write_needed_(start_corked_) {
1006 this->BindReactor(reactor);
1007
1008 // Set up the unchanging parts of the start and write tags and ops.
1009 start_tag_.Set(
1010 call_.call(),
1011 [this](bool ok) {
1012 reactor_->OnReadInitialMetadataDone(
1013 ok && !reactor_->InternalTrailersOnly(call_.call()));
1014 MaybeFinish(/*from_reaction=*/true);
1015 },
1016 &start_ops_, /*can_inline=*/false);
1017 start_ops_.RecvInitialMetadata(context_);
1018 start_ops_.set_core_cq_tag(&start_tag_);
1019
1020 write_tag_.Set(
1021 call_.call(),
1022 [this](bool ok) {
1023 reactor_->OnWriteDone(ok);
1024 MaybeFinish(/*from_reaction=*/true);
1025 },
1026 &write_ops_, /*can_inline=*/false);
1027 write_ops_.set_core_cq_tag(&write_tag_);
1028
1029 // Also set up the Finish tag and op set.
1030 finish_ops_.RecvMessage(response);
1031 finish_ops_.AllowNoMessage();
1032 finish_tag_.Set(
1033 call_.call(),
1034 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1035 &finish_ops_,
1036 /*can_inline=*/false);
1037 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1038 finish_ops_.set_core_cq_tag(&finish_tag_);
1039 }
1040
1041 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1042 void MaybeFinish(bool from_reaction) {
1043 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1044 1, std::memory_order_acq_rel) == 1)) {
1045 ::grpc::Status s = std::move(finish_status_);
1046 auto* reactor = reactor_;
1047 auto* call = call_.call();
1048 this->~ClientCallbackWriterImpl();
1049 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1050 if (GPR_LIKELY(from_reaction)) {
1051 reactor->OnDone(s);
1052 } else {
1053 reactor->InternalScheduleOnDone(std::move(s));
1054 }
1055 }
1056 }
1057
1058 ::grpc::ClientContext* const context_;
1059 grpc::internal::Call call_;
1060 ClientWriteReactor<Request>* const reactor_;
1061
1062 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1063 grpc::internal::CallOpRecvInitialMetadata>
1064 start_ops_;
1065 grpc::internal::CallbackWithSuccessTag start_tag_;
1066 const bool start_corked_;
1067 bool corked_write_needed_; // no lock needed since only accessed in
1068 // Write/WritesDone which cannot be concurrent
1069
1070 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1071 grpc::internal::CallOpClientRecvStatus>
1072 finish_ops_;
1073 grpc::internal::CallbackWithSuccessTag finish_tag_;
1074 ::grpc::Status finish_status_;
1075
1076 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1077 grpc::internal::CallOpSendMessage,
1078 grpc::internal::CallOpClientSendClose>
1079 write_ops_;
1080 grpc::internal::CallbackWithSuccessTag write_tag_;
1081
1082 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1083 grpc::internal::CallOpClientSendClose>
1084 writes_done_ops_;
1085 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1086
1087 struct StartCallBacklog {
1088 bool write_ops = false;
1089 bool writes_done_ops = false;
1090 };
1091 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1092
1093 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1094 std::atomic<intptr_t> callbacks_outstanding_{3};
1095 std::atomic_bool started_{false};
1096 grpc::internal::Mutex start_mu_;
1097 };
1098
1099 template <class Request>
1100 class ClientCallbackWriterFactory {
1101 public:
1102 template <class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1103 static void Create(::grpc::ChannelInterface* channel,
1104 const ::grpc::internal::RpcMethod& method,
1105 ::grpc::ClientContext* context, Response* response,
1106 ClientWriteReactor<Request>* reactor) {
1107 grpc::internal::Call call =
1108 channel->CreateCall(method, context, channel->CallbackCQ());
1109
1110 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1111 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1112 call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1113 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1114 }
1115 };
1116
1117 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1118 public:
1119 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1120 static void operator delete(void* /*ptr*/, std::size_t size) {
1121 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1122 }
1123
1124 // This operator should never be called as the memory should be freed as part
1125 // of the arena destruction. It only exists to provide a matching operator
1126 // delete to the operator new so that some compilers will not complain (see
1127 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1128 // there are no tests catching the compiler warning.
delete(void *,void *)1129 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1130
StartCall()1131 void StartCall() override {
1132 // This call initiates two batches, each with a callback
1133 // 1. Send initial metadata + write + writes done + recv initial metadata
1134 // 2. Read message, recv trailing metadata
1135
1136 start_tag_.Set(
1137 call_.call(),
1138 [this](bool ok) {
1139 reactor_->OnReadInitialMetadataDone(
1140 ok && !reactor_->InternalTrailersOnly(call_.call()));
1141 MaybeFinish();
1142 },
1143 &start_ops_, /*can_inline=*/false);
1144 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1145 context_->initial_metadata_flags());
1146 start_ops_.RecvInitialMetadata(context_);
1147 start_ops_.set_core_cq_tag(&start_tag_);
1148 call_.PerformOps(&start_ops_);
1149
1150 finish_tag_.Set(
1151 call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1152 /*can_inline=*/false);
1153 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1154 finish_ops_.set_core_cq_tag(&finish_tag_);
1155 call_.PerformOps(&finish_ops_);
1156 }
1157
1158 private:
1159 friend class ClientCallbackUnaryFactory;
1160
1161 template <class Request, class Response>
ClientCallbackUnaryImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1162 ClientCallbackUnaryImpl(::grpc::internal::Call call,
1163 ::grpc::ClientContext* context, Request* request,
1164 Response* response, ClientUnaryReactor* reactor)
1165 : context_(context), call_(call), reactor_(reactor) {
1166 this->BindReactor(reactor);
1167 // TODO(vjpai): don't assert
1168 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1169 start_ops_.ClientSendClose();
1170 finish_ops_.RecvMessage(response);
1171 finish_ops_.AllowNoMessage();
1172 }
1173
1174 // In the unary case, MaybeFinish is only ever invoked from a
1175 // library-initiated reaction, so it will just directly call OnDone if this is
1176 // the last reaction for this RPC.
MaybeFinish()1177 void MaybeFinish() {
1178 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1179 1, std::memory_order_acq_rel) == 1)) {
1180 ::grpc::Status s = std::move(finish_status_);
1181 auto* reactor = reactor_;
1182 auto* call = call_.call();
1183 this->~ClientCallbackUnaryImpl();
1184 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1185 reactor->OnDone(s);
1186 }
1187 }
1188
1189 ::grpc::ClientContext* const context_;
1190 grpc::internal::Call call_;
1191 ClientUnaryReactor* const reactor_;
1192
1193 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1194 grpc::internal::CallOpSendMessage,
1195 grpc::internal::CallOpClientSendClose,
1196 grpc::internal::CallOpRecvInitialMetadata>
1197 start_ops_;
1198 grpc::internal::CallbackWithSuccessTag start_tag_;
1199
1200 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1201 grpc::internal::CallOpClientRecvStatus>
1202 finish_ops_;
1203 grpc::internal::CallbackWithSuccessTag finish_tag_;
1204 ::grpc::Status finish_status_;
1205
1206 // This call will have 2 callbacks: start and finish
1207 std::atomic<intptr_t> callbacks_outstanding_{2};
1208 };
1209
1210 class ClientCallbackUnaryFactory {
1211 public:
1212 template <class Request, class Response, class BaseRequest = Request,
1213 class BaseResponse = Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1214 static void Create(::grpc::ChannelInterface* channel,
1215 const ::grpc::internal::RpcMethod& method,
1216 ::grpc::ClientContext* context, const Request* request,
1217 Response* response, ClientUnaryReactor* reactor) {
1218 grpc::internal::Call call =
1219 channel->CreateCall(method, context, channel->CallbackCQ());
1220
1221 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1222
1223 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1224 call.call(), sizeof(ClientCallbackUnaryImpl)))
1225 ClientCallbackUnaryImpl(call, context,
1226 static_cast<const BaseRequest*>(request),
1227 static_cast<BaseResponse*>(response), reactor);
1228 }
1229 };
1230
1231 } // namespace internal
1232 } // namespace grpc
1233 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
1234