1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/binder/transport/binder_transport.h"
18 
19 #ifndef GRPC_NO_BINDER
20 
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
29 
30 #include <grpc/support/log.h>
31 
32 #include "src/core/ext/transport/binder/transport/binder_stream.h"
33 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
34 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
35 #include "src/core/ext/transport/binder/wire_format/wire_reader.h"
36 #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
37 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice_utils.h"
40 #include "src/core/lib/transport/byte_stream.h"
41 #include "src/core/lib/transport/error_utils.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/transport/transport.h"
46 
47 #ifndef NDEBUG
grpc_binder_stream_ref(grpc_binder_stream * s,const char * reason)48 static void grpc_binder_stream_ref(grpc_binder_stream* s, const char* reason) {
49   grpc_stream_ref(s->refcount, reason);
50 }
grpc_binder_stream_unref(grpc_binder_stream * s,const char * reason)51 static void grpc_binder_stream_unref(grpc_binder_stream* s,
52                                      const char* reason) {
53   grpc_stream_unref(s->refcount, reason);
54 }
grpc_binder_ref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)55 static void grpc_binder_ref_transport(grpc_binder_transport* t,
56                                       const char* reason, const char* file,
57                                       int line) {
58   t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
59 }
grpc_binder_unref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)60 static void grpc_binder_unref_transport(grpc_binder_transport* t,
61                                         const char* reason, const char* file,
62                                         int line) {
63   if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
64     delete t;
65   }
66 }
67 #else
grpc_binder_stream_ref(grpc_binder_stream * s)68 static void grpc_binder_stream_ref(grpc_binder_stream* s) {
69   grpc_stream_ref(s->refcount);
70 }
grpc_binder_stream_unref(grpc_binder_stream * s)71 static void grpc_binder_stream_unref(grpc_binder_stream* s) {
72   grpc_stream_unref(s->refcount);
73 }
grpc_binder_ref_transport(grpc_binder_transport * t)74 static void grpc_binder_ref_transport(grpc_binder_transport* t) {
75   t->refs.Ref();
76 }
grpc_binder_unref_transport(grpc_binder_transport * t)77 static void grpc_binder_unref_transport(grpc_binder_transport* t) {
78   if (t->refs.Unref()) {
79     delete t;
80   }
81 }
82 #endif
83 
84 #ifndef NDEBUG
85 #define GRPC_BINDER_STREAM_REF(stream, reason) \
86   grpc_binder_stream_ref(stream, reason)
87 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
88   grpc_binder_stream_unref(stream, reason)
89 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
90   grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
91 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
92   grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
93 #else
94 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
95 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
96   grpc_binder_stream_unref(stream)
97 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
98 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
99 #endif
100 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)101 static int init_stream(grpc_transport* gt, grpc_stream* gs,
102                        grpc_stream_refcount* refcount, const void* server_data,
103                        grpc_core::Arena* arena) {
104   GPR_TIMER_SCOPE("init_stream", 0);
105   gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
106           server_data, arena);
107   grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
108   // TODO(mingcl): Figure out if we need to worry about concurrent invocation
109   // here
110   new (gs) grpc_binder_stream(t, refcount, server_data, arena,
111                               t->NewStreamTxCode(), t->is_client);
112   return 0;
113 }
114 
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * gp)115 static void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* gp) {
116   gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
117 }
118 
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)119 static void set_pollset_set(grpc_transport*, grpc_stream*, grpc_pollset_set*) {
120   gpr_log(GPR_INFO, __func__);
121 }
122 
AssignMetadata(grpc_metadata_batch * mb,const grpc_binder::Metadata & md)123 static void AssignMetadata(grpc_metadata_batch* mb,
124                            const grpc_binder::Metadata& md) {
125   mb->Clear();
126   for (auto& p : md) {
127     mb->Append(p.first, grpc_core::Slice::FromCopiedString(p.second));
128   }
129 }
130 
cancel_stream_locked(grpc_binder_transport * gbt,grpc_binder_stream * gbs,grpc_error_handle error)131 static void cancel_stream_locked(grpc_binder_transport* gbt,
132                                  grpc_binder_stream* gbs,
133                                  grpc_error_handle error) {
134   gpr_log(GPR_INFO, "cancel_stream_locked");
135   if (!gbs->is_closed) {
136     GPR_ASSERT(gbs->cancel_self_error == GRPC_ERROR_NONE);
137     gbs->is_closed = true;
138     gbs->cancel_self_error = GRPC_ERROR_REF(error);
139     gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
140     gbt->registered_stream.erase(gbs->tx_code);
141     if (gbs->recv_initial_metadata_ready != nullptr) {
142       grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
143                               GRPC_ERROR_REF(error));
144       gbs->recv_initial_metadata_ready = nullptr;
145       gbs->recv_initial_metadata = nullptr;
146       gbs->trailing_metadata_available = nullptr;
147     }
148     if (gbs->recv_message_ready != nullptr) {
149       grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
150                               GRPC_ERROR_REF(error));
151       gbs->recv_message_ready = nullptr;
152       gbs->recv_message->reset();
153       gbs->recv_message = nullptr;
154       gbs->call_failed_before_recv_message = nullptr;
155     }
156     if (gbs->recv_trailing_metadata_finished != nullptr) {
157       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
158                               gbs->recv_trailing_metadata_finished,
159                               GRPC_ERROR_REF(error));
160       gbs->recv_trailing_metadata_finished = nullptr;
161       gbs->recv_trailing_metadata = nullptr;
162     }
163   }
164   GRPC_ERROR_UNREF(error);
165 }
166 
ContainsAuthorityAndPath(const grpc_binder::Metadata & metadata)167 static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) {
168   bool has_authority = false;
169   bool has_path = false;
170   for (const auto& kv : metadata) {
171     if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_AUTHORITY)) {
172       has_authority = true;
173     }
174     if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_PATH)) {
175       has_path = true;
176     }
177   }
178   return has_authority && has_path;
179 }
180 
recv_initial_metadata_locked(void * arg,grpc_error_handle)181 static void recv_initial_metadata_locked(void* arg,
182                                          grpc_error_handle /*error*/) {
183   RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg);
184   grpc_binder_stream* gbs = args->gbs;
185 
186   gpr_log(GPR_INFO,
187           "recv_initial_metadata_locked is_client = %d is_closed = %d",
188           gbs->is_client, gbs->is_closed);
189 
190   if (!gbs->is_closed) {
191     grpc_error_handle error = [&] {
192       GPR_ASSERT(gbs->recv_initial_metadata);
193       GPR_ASSERT(gbs->recv_initial_metadata_ready);
194       if (!args->initial_metadata.ok()) {
195         gpr_log(GPR_ERROR, "Failed to parse initial metadata");
196         return absl_status_to_grpc_error(args->initial_metadata.status());
197       }
198       if (!gbs->is_client) {
199         // For server, we expect :authority and :path in initial metadata.
200         if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
201           return GRPC_ERROR_CREATE_FROM_CPP_STRING(
202               "Missing :authority or :path in initial metadata");
203         }
204       }
205       AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
206       return GRPC_ERROR_NONE;
207     }();
208 
209     grpc_closure* cb = gbs->recv_initial_metadata_ready;
210     gbs->recv_initial_metadata_ready = nullptr;
211     gbs->recv_initial_metadata = nullptr;
212     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
213   }
214   GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
215 }
216 
recv_message_locked(void * arg,grpc_error_handle)217 static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
218   RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
219   grpc_binder_stream* gbs = args->gbs;
220 
221   gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
222           gbs->is_client, gbs->is_closed);
223 
224   if (!gbs->is_closed) {
225     grpc_error_handle error = [&] {
226       GPR_ASSERT(gbs->recv_message);
227       GPR_ASSERT(gbs->recv_message_ready);
228       if (!args->message.ok()) {
229         gpr_log(GPR_ERROR, "Failed to receive message");
230         if (args->message.status().message() ==
231             grpc_binder::TransportStreamReceiver::
232                 kGrpcBinderTransportCancelledGracefully) {
233           gpr_log(GPR_ERROR, "message cancelled gracefully");
234           // Cancelled because we've already received trailing metadata.
235           // It's not an error in this case.
236           return GRPC_ERROR_NONE;
237         } else {
238           return absl_status_to_grpc_error(args->message.status());
239         }
240       }
241       grpc_slice_buffer buf;
242       grpc_slice_buffer_init(&buf);
243       grpc_slice_buffer_add(&buf, grpc_slice_from_cpp_string(*args->message));
244 
245       gbs->sbs.Init(&buf, 0);
246       gbs->recv_message->reset(gbs->sbs.get());
247       return GRPC_ERROR_NONE;
248     }();
249 
250     if (error != GRPC_ERROR_NONE &&
251         gbs->call_failed_before_recv_message != nullptr) {
252       *gbs->call_failed_before_recv_message = true;
253     }
254     grpc_closure* cb = gbs->recv_message_ready;
255     gbs->recv_message_ready = nullptr;
256     gbs->recv_message = nullptr;
257     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
258   }
259 
260   GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
261 }
262 
recv_trailing_metadata_locked(void * arg,grpc_error_handle)263 static void recv_trailing_metadata_locked(void* arg,
264                                           grpc_error_handle /*error*/) {
265   RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg);
266   grpc_binder_stream* gbs = args->gbs;
267 
268   gpr_log(GPR_INFO,
269           "recv_trailing_metadata_locked is_client = %d is_closed = %d",
270           gbs->is_client, gbs->is_closed);
271 
272   if (!gbs->is_closed) {
273     grpc_error_handle error = [&] {
274       GPR_ASSERT(gbs->recv_trailing_metadata);
275       GPR_ASSERT(gbs->recv_trailing_metadata_finished);
276       if (!args->trailing_metadata.ok()) {
277         gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
278         return absl_status_to_grpc_error(args->trailing_metadata.status());
279       }
280       if (!gbs->is_client) {
281         // Client will not send non-empty trailing metadata.
282         if (!args->trailing_metadata.value().empty()) {
283           gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
284           return GRPC_ERROR_CANCELLED;
285         }
286       } else {
287         AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
288         // Append status to metadata
289         // TODO(b/192208695): See if we can avoid to manually put status
290         // code into the header
291         gpr_log(GPR_INFO, "status = %d", args->status);
292         grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
293             gbs->arena->Alloc(sizeof(grpc_linked_mdelem)));
294         glm->md = grpc_get_reffed_status_elem(args->status);
295         GPR_ASSERT(gbs->recv_trailing_metadata->LinkTail(glm) ==
296                    GRPC_ERROR_NONE);
297         gpr_log(GPR_INFO, "trailing_metadata = %p",
298                 gbs->recv_trailing_metadata);
299         gpr_log(GPR_INFO, "glm = %p", glm);
300       }
301       return GRPC_ERROR_NONE;
302     }();
303 
304     if (gbs->is_client || gbs->trailing_metadata_sent) {
305       grpc_closure* cb = gbs->recv_trailing_metadata_finished;
306       gbs->recv_trailing_metadata_finished = nullptr;
307       gbs->recv_trailing_metadata = nullptr;
308       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
309     } else {
310       // According to transport explaineer - "Server extra: This op shouldn't
311       // actually be considered complete until the server has also sent trailing
312       // metadata to provide the other side with final status"
313       //
314       // We haven't sent trailing metadata yet, so we have to delay completing
315       // the recv_trailing_metadata callback.
316       gbs->need_to_call_trailing_metadata_callback = true;
317     }
318   }
319   GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
320 }
321 
322 namespace grpc_binder {
323 namespace {
324 
325 class MetadataEncoder {
326  public:
MetadataEncoder(bool is_client,Transaction * tx,Metadata * init_md)327   MetadataEncoder(bool is_client, Transaction* tx, Metadata* init_md)
328       : is_client_(is_client), tx_(tx), init_md_(init_md) {}
329 
Encode(grpc_mdelem md)330   void Encode(grpc_mdelem md) {
331     absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
332     absl::string_view value = grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
333     gpr_log(GPR_INFO, "send metadata key-value %s",
334             absl::StrCat(key, " ", value).c_str());
335     if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
336       // TODO(b/192208403): Figure out if it is correct to simply drop '/'
337       // prefix and treat it as rpc method name
338       GPR_ASSERT(value[0] == '/');
339       std::string path = std::string(value).substr(1);
340 
341       // Only client send method ref.
342       GPR_ASSERT(is_client_);
343       tx_->SetMethodRef(path);
344     } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS)) {
345       int status = grpc_get_status_code_from_metadata(md);
346       gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
347       tx_->SetStatus(status);
348     } else {
349       init_md_->emplace_back(std::string(key), std::string(value));
350     }
351   }
352 
353   template <typename Trait>
Encode(Trait,const typename Trait::ValueType & value)354   void Encode(Trait, const typename Trait::ValueType& value) {
355     init_md_->emplace_back(std::string(Trait::key()),
356                            std::string(Trait::Encode(value).as_string_view()));
357   }
358 
359  private:
360   const bool is_client_;
361   Transaction* const tx_;
362   Metadata* const init_md_;
363 };
364 
365 }  // namespace
366 }  // namespace grpc_binder
367 
perform_stream_op_locked(void * stream_op,grpc_error_handle)368 static void perform_stream_op_locked(void* stream_op,
369                                      grpc_error_handle /*error*/) {
370   grpc_transport_stream_op_batch* op =
371       static_cast<grpc_transport_stream_op_batch*>(stream_op);
372   grpc_binder_stream* gbs =
373       static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
374   grpc_binder_transport* gbt = gbs->t;
375   if (op->cancel_stream) {
376     // TODO(waynetu): Is this true?
377     GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
378                !op->send_trailing_metadata && !op->recv_initial_metadata &&
379                !op->recv_message && !op->recv_trailing_metadata);
380     gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
381     if (!gbs->is_client) {
382       // Send trailing metadata to inform the other end about the cancellation,
383       // regardless if we'd already done that or not.
384       grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbt->is_client);
385       cancel_tx.SetSuffix(grpc_binder::Metadata{});
386       cancel_tx.SetStatus(1);
387       absl::Status status = gbt->wire_writer->RpcCall(cancel_tx);
388     }
389     cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
390     if (op->on_complete != nullptr) {
391       grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, GRPC_ERROR_NONE);
392     }
393     GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
394     return;
395   }
396 
397   if (gbs->is_closed) {
398     if (op->send_message) {
399       // Reset the send_message payload to prevent memory leaks.
400       op->payload->send_message.send_message.reset();
401     }
402     if (op->recv_initial_metadata) {
403       grpc_core::ExecCtx::Run(
404           DEBUG_LOCATION,
405           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
406           GRPC_ERROR_REF(gbs->cancel_self_error));
407     }
408     if (op->recv_message) {
409       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
410                               op->payload->recv_message.recv_message_ready,
411                               GRPC_ERROR_REF(gbs->cancel_self_error));
412     }
413     if (op->recv_trailing_metadata) {
414       grpc_core::ExecCtx::Run(
415           DEBUG_LOCATION,
416           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
417           GRPC_ERROR_REF(gbs->cancel_self_error));
418     }
419     if (op->on_complete != nullptr) {
420       grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
421                               GRPC_ERROR_REF(gbs->cancel_self_error));
422     }
423     GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
424     return;
425   }
426 
427   int tx_code = gbs->tx_code;
428   grpc_binder::Transaction tx(tx_code, gbt->is_client);
429 
430   if (op->send_initial_metadata) {
431     gpr_log(GPR_INFO, "send_initial_metadata");
432     grpc_binder::Metadata init_md;
433     auto batch = op->payload->send_initial_metadata.send_initial_metadata;
434 
435     grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx, &init_md);
436     batch->Encode(&encoder);
437     tx.SetPrefix(init_md);
438   }
439   if (op->send_message) {
440     gpr_log(GPR_INFO, "send_message");
441     size_t remaining = op->payload->send_message.send_message->length();
442     std::string message_data;
443     while (remaining > 0) {
444       grpc_slice message_slice;
445       // TODO(waynetu): Temporarily assume that the message is ready.
446       GPR_ASSERT(
447           op->payload->send_message.send_message->Next(SIZE_MAX, nullptr));
448       grpc_error_handle error =
449           op->payload->send_message.send_message->Pull(&message_slice);
450       // TODO(waynetu): Cancel the stream if error is not GRPC_ERROR_NONE.
451       GPR_ASSERT(error == GRPC_ERROR_NONE);
452       uint8_t* p = GRPC_SLICE_START_PTR(message_slice);
453       size_t len = GRPC_SLICE_LENGTH(message_slice);
454       remaining -= len;
455       message_data += std::string(reinterpret_cast<char*>(p), len);
456       grpc_slice_unref_internal(message_slice);
457     }
458     gpr_log(GPR_INFO, "message_data = %s", message_data.c_str());
459     tx.SetData(message_data);
460     // TODO(b/192369787): Are we supposed to reset here to avoid
461     // use-after-free issue in call.cc?
462     op->payload->send_message.send_message.reset();
463   }
464 
465   if (op->send_trailing_metadata) {
466     gpr_log(GPR_INFO, "send_trailing_metadata");
467     auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
468     grpc_binder::Metadata trailing_metadata;
469 
470     grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx,
471                                          &trailing_metadata);
472     batch->Encode(&encoder);
473 
474     // TODO(mingcl): Will we ever has key-value pair here? According to
475     // wireformat client suffix data is always empty.
476     tx.SetSuffix(trailing_metadata);
477   }
478   if (op->recv_initial_metadata) {
479     gpr_log(GPR_INFO, "recv_initial_metadata");
480     gbs->recv_initial_metadata_ready =
481         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
482     gbs->recv_initial_metadata =
483         op->payload->recv_initial_metadata.recv_initial_metadata;
484     gbs->trailing_metadata_available =
485         op->payload->recv_initial_metadata.trailing_metadata_available;
486     GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
487     gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
488         tx_code, [tx_code, gbs,
489                   gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
490           grpc_core::ExecCtx exec_ctx;
491           gbs->recv_initial_metadata_args.tx_code = tx_code;
492           gbs->recv_initial_metadata_args.initial_metadata =
493               std::move(initial_metadata);
494           gbt->combiner->Run(
495               GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure,
496                                 recv_initial_metadata_locked,
497                                 &gbs->recv_initial_metadata_args, nullptr),
498               GRPC_ERROR_NONE);
499         });
500   }
501   if (op->recv_message) {
502     gpr_log(GPR_INFO, "recv_message");
503     gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
504     gbs->recv_message = op->payload->recv_message.recv_message;
505     gbs->call_failed_before_recv_message =
506         op->payload->recv_message.call_failed_before_recv_message;
507     GRPC_BINDER_STREAM_REF(gbs, "recv_message");
508     gbt->transport_stream_receiver->RegisterRecvMessage(
509         tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
510           grpc_core::ExecCtx exec_ctx;
511           gbs->recv_message_args.tx_code = tx_code;
512           gbs->recv_message_args.message = std::move(message);
513           gbt->combiner->Run(
514               GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked,
515                                 &gbs->recv_message_args, nullptr),
516               GRPC_ERROR_NONE);
517         });
518   }
519   if (op->recv_trailing_metadata) {
520     gpr_log(GPR_INFO, "recv_trailing_metadata");
521     gbs->recv_trailing_metadata_finished =
522         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
523     gbs->recv_trailing_metadata =
524         op->payload->recv_trailing_metadata.recv_trailing_metadata;
525     GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
526     gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
527         tx_code, [tx_code, gbs, gbt](
528                      absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
529                      int status) {
530           grpc_core::ExecCtx exec_ctx;
531           gbs->recv_trailing_metadata_args.tx_code = tx_code;
532           gbs->recv_trailing_metadata_args.trailing_metadata =
533               std::move(trailing_metadata);
534           gbs->recv_trailing_metadata_args.status = status;
535           gbt->combiner->Run(
536               GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure,
537                                 recv_trailing_metadata_locked,
538                                 &gbs->recv_trailing_metadata_args, nullptr),
539               GRPC_ERROR_NONE);
540         });
541   }
542   // Only send transaction when there's a send op presented.
543   absl::Status status = absl::OkStatus();
544   if (op->send_initial_metadata || op->send_message ||
545       op->send_trailing_metadata) {
546     // TODO(waynetu): RpcCall() is doing a lot of work (including waiting for
547     // acknowledgements from the other side). Consider delaying this operation
548     // with combiner.
549     status = gbt->wire_writer->RpcCall(tx);
550     if (!gbs->is_client && op->send_trailing_metadata) {
551       gbs->trailing_metadata_sent = true;
552       // According to transport explaineer - "Server extra: This op shouldn't
553       // actually be considered complete until the server has also sent trailing
554       // metadata to provide the other side with final status"
555       //
556       // Because we've done sending trailing metadata here, we can safely
557       // complete the recv_trailing_metadata callback here.
558       if (gbs->need_to_call_trailing_metadata_callback) {
559         grpc_closure* cb = gbs->recv_trailing_metadata_finished;
560         gbs->recv_trailing_metadata_finished = nullptr;
561         grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
562         gbs->need_to_call_trailing_metadata_callback = false;
563       }
564     }
565   }
566   // Note that this should only be scheduled when all non-recv ops are
567   // completed
568   if (op->on_complete != nullptr) {
569     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
570                             absl_status_to_grpc_error(status));
571     gpr_log(GPR_INFO, "on_complete closure schuduled");
572   }
573   GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
574 }
575 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)576 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
577                               grpc_transport_stream_op_batch* op) {
578   GPR_TIMER_SCOPE("perform_stream_op", 0);
579   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
580   grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
581   gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, gt, gs, op,
582           gbs->is_client);
583   GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
584   op->handler_private.extra_arg = gbs;
585   gbt->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
586                                        perform_stream_op_locked, op, nullptr),
587                      GRPC_ERROR_NONE);
588 }
589 
close_transport_locked(grpc_binder_transport * gbt)590 static void close_transport_locked(grpc_binder_transport* gbt) {
591   gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
592                               "transport closed due to disconnection/goaway");
593   while (!gbt->registered_stream.empty()) {
594     cancel_stream_locked(
595         gbt, gbt->registered_stream.begin()->second,
596         grpc_error_set_int(
597             GRPC_ERROR_CREATE_FROM_STATIC_STRING("transport closed"),
598             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
599   }
600 }
601 
perform_transport_op_locked(void * transport_op,grpc_error_handle)602 static void perform_transport_op_locked(void* transport_op,
603                                         grpc_error_handle /*error*/) {
604   grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
605   grpc_binder_transport* gbt =
606       static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
607   // TODO(waynetu): Should we lock here to avoid data race?
608   if (op->start_connectivity_watch != nullptr) {
609     gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
610                                   std::move(op->start_connectivity_watch));
611   }
612   if (op->stop_connectivity_watch != nullptr) {
613     gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
614   }
615   if (op->set_accept_stream) {
616     gbt->accept_stream_fn = op->set_accept_stream_fn;
617     gbt->accept_stream_user_data = op->set_accept_stream_user_data;
618   }
619   if (op->on_consumed) {
620     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
621   }
622   bool do_close = false;
623   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
624     do_close = true;
625     GRPC_ERROR_UNREF(op->disconnect_with_error);
626   }
627   if (op->goaway_error != GRPC_ERROR_NONE) {
628     do_close = true;
629     GRPC_ERROR_UNREF(op->goaway_error);
630   }
631   if (do_close) {
632     close_transport_locked(gbt);
633   }
634   GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
635 }
636 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)637 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
638   gpr_log(GPR_INFO, __func__);
639   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
640   op->handler_private.extra_arg = gbt;
641   GRPC_BINDER_REF_TRANSPORT(gbt, "perform_transport_op");
642   gbt->combiner->Run(
643       GRPC_CLOSURE_INIT(&op->handler_private.closure,
644                         perform_transport_op_locked, op, nullptr),
645       GRPC_ERROR_NONE);
646 }
647 
destroy_stream_locked(void * sp,grpc_error_handle)648 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
649   grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
650   grpc_binder_transport* gbt = gbs->t;
651   cancel_stream_locked(
652       gbt, gbs,
653       grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("destroy stream"),
654                          GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
655   gbs->~grpc_binder_stream();
656 }
657 
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)658 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
659                            grpc_closure* then_schedule_closure) {
660   gpr_log(GPR_INFO, __func__);
661   grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
662   gbs->destroy_stream_then_closure = then_schedule_closure;
663   gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream,
664                                           destroy_stream_locked, gbs, nullptr),
665                         GRPC_ERROR_NONE);
666 }
667 
destroy_transport_locked(void * gt,grpc_error_handle)668 static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
669   grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
670   close_transport_locked(gbt);
671   // Release the references held by the transport.
672   gbt->wire_reader = nullptr;
673   gbt->transport_stream_receiver = nullptr;
674   gbt->wire_writer = nullptr;
675   GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
676 }
677 
destroy_transport(grpc_transport * gt)678 static void destroy_transport(grpc_transport* gt) {
679   gpr_log(GPR_INFO, __func__);
680   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
681   gbt->combiner->Run(
682       GRPC_CLOSURE_CREATE(destroy_transport_locked, gbt, nullptr),
683       GRPC_ERROR_NONE);
684 }
685 
get_endpoint(grpc_transport *)686 static grpc_endpoint* get_endpoint(grpc_transport*) {
687   gpr_log(GPR_INFO, __func__);
688   return nullptr;
689 }
690 
691 // See grpc_transport_vtable declaration for meaning of each field
692 static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
693                                              "binder",
694                                              init_stream,
695                                              set_pollset,
696                                              set_pollset_set,
697                                              perform_stream_op,
698                                              perform_transport_op,
699                                              destroy_stream,
700                                              destroy_transport,
701                                              get_endpoint};
702 
get_vtable()703 static const grpc_transport_vtable* get_vtable() { return &vtable; }
704 
accept_stream_locked(void * gt,grpc_error_handle)705 static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
706   grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
707   if (gbt->accept_stream_fn) {
708     // must pass in a non-null value.
709     (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
710   }
711 }
712 
grpc_binder_transport(std::unique_ptr<grpc_binder::Binder> binder,bool is_client,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)713 grpc_binder_transport::grpc_binder_transport(
714     std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
715     std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
716     : is_client(is_client),
717       combiner(grpc_combiner_create()),
718       state_tracker(
719           is_client ? "binder_transport_client" : "binder_transport_server",
720           GRPC_CHANNEL_READY),
721       refs(1, nullptr) {
722   gpr_log(GPR_INFO, __func__);
723   base.vtable = get_vtable();
724   GRPC_CLOSURE_INIT(&accept_stream_closure, accept_stream_locked, this,
725                     nullptr);
726   transport_stream_receiver =
727       std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
728           is_client, /*accept_stream_callback=*/[this] {
729             grpc_core::ExecCtx exec_ctx;
730             combiner->Run(&accept_stream_closure, GRPC_ERROR_NONE);
731           });
732   // WireReader holds a ref to grpc_binder_transport.
733   GRPC_BINDER_REF_TRANSPORT(this, "wire reader");
734   wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
735       transport_stream_receiver, is_client, security_policy,
736       /*on_destruct_callback=*/
737       [this] {
738         // Unref transport when destructed.
739         GRPC_BINDER_UNREF_TRANSPORT(this, "wire reader");
740       });
741   wire_writer = wire_reader->SetupTransport(std::move(binder));
742 }
743 
~grpc_binder_transport()744 grpc_binder_transport::~grpc_binder_transport() {
745   GRPC_COMBINER_UNREF(combiner, "binder_transport");
746 }
747 
grpc_create_binder_transport_client(std::unique_ptr<grpc_binder::Binder> endpoint_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)748 grpc_transport* grpc_create_binder_transport_client(
749     std::unique_ptr<grpc_binder::Binder> endpoint_binder,
750     std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
751         security_policy) {
752   gpr_log(GPR_INFO, __func__);
753 
754   GPR_ASSERT(endpoint_binder != nullptr);
755   GPR_ASSERT(security_policy != nullptr);
756 
757   grpc_binder_transport* t = new grpc_binder_transport(
758       std::move(endpoint_binder), /*is_client=*/true, security_policy);
759 
760   return &t->base;
761 }
762 
grpc_create_binder_transport_server(std::unique_ptr<grpc_binder::Binder> client_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)763 grpc_transport* grpc_create_binder_transport_server(
764     std::unique_ptr<grpc_binder::Binder> client_binder,
765     std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
766         security_policy) {
767   gpr_log(GPR_INFO, __func__);
768 
769   GPR_ASSERT(client_binder != nullptr);
770   GPR_ASSERT(security_policy != nullptr);
771 
772   grpc_binder_transport* t = new grpc_binder_transport(
773       std::move(client_binder), /*is_client=*/false, security_policy);
774 
775   return &t->base;
776 }
777 #endif
778