1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/binder/transport/binder_transport.h"
18 
19 #ifndef GRPC_NO_BINDER
20 
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
29 
30 #include <grpc/support/log.h>
31 
32 #include "src/core/ext/transport/binder/transport/binder_stream.h"
33 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
34 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
35 #include "src/core/ext/transport/binder/wire_format/wire_reader.h"
36 #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
37 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice_utils.h"
40 #include "src/core/lib/transport/byte_stream.h"
41 #include "src/core/lib/transport/error_utils.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/transport/transport.h"
46 
47 #ifndef NDEBUG
grpc_binder_stream_ref(grpc_binder_stream * s,const char * reason)48 static void grpc_binder_stream_ref(grpc_binder_stream* s, const char* reason) {
49   grpc_stream_ref(s->refcount, reason);
50 }
grpc_binder_stream_unref(grpc_binder_stream * s,const char * reason)51 static void grpc_binder_stream_unref(grpc_binder_stream* s,
52                                      const char* reason) {
53   grpc_stream_unref(s->refcount, reason);
54 }
grpc_binder_ref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)55 static void grpc_binder_ref_transport(grpc_binder_transport* t,
56                                       const char* reason, const char* file,
57                                       int line) {
58   t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
59 }
grpc_binder_unref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)60 static void grpc_binder_unref_transport(grpc_binder_transport* t,
61                                         const char* reason, const char* file,
62                                         int line) {
63   if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
64     delete t;
65   }
66 }
67 #else
grpc_binder_stream_ref(grpc_binder_stream * s)68 static void grpc_binder_stream_ref(grpc_binder_stream* s) {
69   grpc_stream_ref(s->refcount);
70 }
grpc_binder_stream_unref(grpc_binder_stream * s)71 static void grpc_binder_stream_unref(grpc_binder_stream* s) {
72   grpc_stream_unref(s->refcount);
73 }
grpc_binder_ref_transport(grpc_binder_transport * t)74 static void grpc_binder_ref_transport(grpc_binder_transport* t) {
75   t->refs.Ref();
76 }
grpc_binder_unref_transport(grpc_binder_transport * t)77 static void grpc_binder_unref_transport(grpc_binder_transport* t) {
78   if (t->refs.Unref()) {
79     delete t;
80   }
81 }
82 #endif
83 
84 #ifndef NDEBUG
85 #define GRPC_BINDER_STREAM_REF(stream, reason) \
86   grpc_binder_stream_ref(stream, reason)
87 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
88   grpc_binder_stream_unref(stream, reason)
89 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
90   grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
91 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
92   grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
93 #else
94 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
95 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
96   grpc_binder_stream_unref(stream)
97 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
98 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
99 #endif
100 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)101 static int init_stream(grpc_transport* gt, grpc_stream* gs,
102                        grpc_stream_refcount* refcount, const void* server_data,
103                        grpc_core::Arena* arena) {
104   GPR_TIMER_SCOPE("init_stream", 0);
105   gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
106           server_data, arena);
107   grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
108   // TODO(mingcl): Figure out if we need to worry about concurrent invocation
109   // here
110   new (gs) grpc_binder_stream(t, refcount, server_data, arena,
111                               t->NewStreamTxCode(), t->is_client);
112   return 0;
113 }
114 
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * gp)115 static void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* gp) {
116   gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
117 }
118 
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)119 static void set_pollset_set(grpc_transport*, grpc_stream*, grpc_pollset_set*) {
120   gpr_log(GPR_INFO, __func__);
121 }
122 
AssignMetadata(grpc_metadata_batch * mb,const grpc_binder::Metadata & md)123 static void AssignMetadata(grpc_metadata_batch* mb,
124                            const grpc_binder::Metadata& md) {
125   mb->Clear();
126   for (auto& p : md) {
127     mb->Append(p.first, grpc_slice_from_cpp_string(p.second));
128   }
129 }
130 
cancel_stream_locked(grpc_binder_transport * gbt,grpc_binder_stream * gbs,grpc_error_handle error)131 static void cancel_stream_locked(grpc_binder_transport* gbt,
132                                  grpc_binder_stream* gbs,
133                                  grpc_error_handle error) {
134   gpr_log(GPR_INFO, "cancel_stream_locked");
135   if (!gbs->is_closed) {
136     GPR_ASSERT(gbs->cancel_self_error == GRPC_ERROR_NONE);
137     gbs->is_closed = true;
138     gbs->cancel_self_error = GRPC_ERROR_REF(error);
139     gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
140     gbt->registered_stream.erase(gbs->tx_code);
141     if (gbs->recv_initial_metadata_ready != nullptr) {
142       grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
143                               GRPC_ERROR_REF(error));
144       gbs->recv_initial_metadata_ready = nullptr;
145       gbs->recv_initial_metadata = nullptr;
146       gbs->trailing_metadata_available = nullptr;
147     }
148     if (gbs->recv_message_ready != nullptr) {
149       grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
150                               GRPC_ERROR_REF(error));
151       gbs->recv_message_ready = nullptr;
152       gbs->recv_message->reset();
153       gbs->recv_message = nullptr;
154       gbs->call_failed_before_recv_message = nullptr;
155     }
156     if (gbs->recv_trailing_metadata_finished != nullptr) {
157       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
158                               gbs->recv_trailing_metadata_finished,
159                               GRPC_ERROR_REF(error));
160       gbs->recv_trailing_metadata_finished = nullptr;
161       gbs->recv_trailing_metadata = nullptr;
162     }
163   }
164   GRPC_ERROR_UNREF(error);
165 }
166 
ContainsAuthorityAndPath(const grpc_binder::Metadata & metadata)167 static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) {
168   bool has_authority = false;
169   bool has_path = false;
170   for (const auto& kv : metadata) {
171     if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_AUTHORITY)) {
172       has_authority = true;
173     }
174     if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_PATH)) {
175       has_path = true;
176     }
177   }
178   return has_authority && has_path;
179 }
180 
recv_initial_metadata_locked(void * arg,grpc_error_handle)181 static void recv_initial_metadata_locked(void* arg,
182                                          grpc_error_handle /*error*/) {
183   RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg);
184   grpc_binder_stream* gbs = args->gbs;
185 
186   gpr_log(GPR_INFO,
187           "recv_initial_metadata_locked is_client = %d is_closed = %d",
188           gbs->is_client, gbs->is_closed);
189 
190   if (!gbs->is_closed) {
191     grpc_error_handle error = [&] {
192       GPR_ASSERT(gbs->recv_initial_metadata);
193       GPR_ASSERT(gbs->recv_initial_metadata_ready);
194       if (!args->initial_metadata.ok()) {
195         gpr_log(GPR_ERROR, "Failed to parse initial metadata");
196         return absl_status_to_grpc_error(args->initial_metadata.status());
197       }
198       if (!gbs->is_client) {
199         // For server, we expect :authority and :path in initial metadata.
200         if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
201           return GRPC_ERROR_CREATE_FROM_CPP_STRING(
202               "Missing :authority or :path in initial metadata");
203         }
204       }
205       AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
206       return GRPC_ERROR_NONE;
207     }();
208 
209     grpc_closure* cb = gbs->recv_initial_metadata_ready;
210     gbs->recv_initial_metadata_ready = nullptr;
211     gbs->recv_initial_metadata = nullptr;
212     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
213   }
214   GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
215 }
216 
recv_message_locked(void * arg,grpc_error_handle)217 static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
218   RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
219   grpc_binder_stream* gbs = args->gbs;
220 
221   gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
222           gbs->is_client, gbs->is_closed);
223 
224   if (!gbs->is_closed) {
225     grpc_error_handle error = [&] {
226       GPR_ASSERT(gbs->recv_message);
227       GPR_ASSERT(gbs->recv_message_ready);
228       if (!args->message.ok()) {
229         gpr_log(GPR_ERROR, "Failed to receive message");
230         if (args->message.status().message() ==
231             grpc_binder::TransportStreamReceiver::
232                 kGrpcBinderTransportCancelledGracefully) {
233           gpr_log(GPR_ERROR, "message cancelled gracefully");
234           // Cancelled because we've already received trailing metadata.
235           // It's not an error in this case.
236           return GRPC_ERROR_NONE;
237         } else {
238           return absl_status_to_grpc_error(args->message.status());
239         }
240       }
241       grpc_slice_buffer buf;
242       grpc_slice_buffer_init(&buf);
243       grpc_slice_buffer_add(&buf, grpc_slice_from_cpp_string(*args->message));
244 
245       gbs->sbs.Init(&buf, 0);
246       gbs->recv_message->reset(gbs->sbs.get());
247       return GRPC_ERROR_NONE;
248     }();
249 
250     if (error != GRPC_ERROR_NONE &&
251         gbs->call_failed_before_recv_message != nullptr) {
252       *gbs->call_failed_before_recv_message = true;
253     }
254     grpc_closure* cb = gbs->recv_message_ready;
255     gbs->recv_message_ready = nullptr;
256     gbs->recv_message = nullptr;
257     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
258   }
259 
260   GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
261 }
262 
recv_trailing_metadata_locked(void * arg,grpc_error_handle)263 static void recv_trailing_metadata_locked(void* arg,
264                                           grpc_error_handle /*error*/) {
265   RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg);
266   grpc_binder_stream* gbs = args->gbs;
267 
268   gpr_log(GPR_INFO,
269           "recv_trailing_metadata_locked is_client = %d is_closed = %d",
270           gbs->is_client, gbs->is_closed);
271 
272   if (!gbs->is_closed) {
273     grpc_error_handle error = [&] {
274       GPR_ASSERT(gbs->recv_trailing_metadata);
275       GPR_ASSERT(gbs->recv_trailing_metadata_finished);
276       if (!args->trailing_metadata.ok()) {
277         gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
278         return absl_status_to_grpc_error(args->trailing_metadata.status());
279       }
280       if (!gbs->is_client) {
281         // Client will not send non-empty trailing metadata.
282         if (!args->trailing_metadata.value().empty()) {
283           gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
284           return GRPC_ERROR_CANCELLED;
285         }
286       } else {
287         AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
288         // Append status to metadata
289         // TODO(b/192208695): See if we can avoid to manually put status
290         // code into the header
291         gpr_log(GPR_INFO, "status = %d", args->status);
292         grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
293             gbs->arena->Alloc(sizeof(grpc_linked_mdelem)));
294         glm->md = grpc_get_reffed_status_elem(args->status);
295         GPR_ASSERT(gbs->recv_trailing_metadata->LinkTail(glm) ==
296                    GRPC_ERROR_NONE);
297         gpr_log(GPR_INFO, "trailing_metadata = %p",
298                 gbs->recv_trailing_metadata);
299         gpr_log(GPR_INFO, "glm = %p", glm);
300       }
301       return GRPC_ERROR_NONE;
302     }();
303 
304     if (gbs->is_client || gbs->trailing_metadata_sent) {
305       grpc_closure* cb = gbs->recv_trailing_metadata_finished;
306       gbs->recv_trailing_metadata_finished = nullptr;
307       gbs->recv_trailing_metadata = nullptr;
308       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
309     } else {
310       // According to transport explaineer - "Server extra: This op shouldn't
311       // actually be considered complete until the server has also sent trailing
312       // metadata to provide the other side with final status"
313       //
314       // We haven't sent trailing metadata yet, so we have to delay completing
315       // the recv_trailing_metadata callback.
316       gbs->need_to_call_trailing_metadata_callback = true;
317     }
318   }
319   GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
320 }
321 
perform_stream_op_locked(void * stream_op,grpc_error_handle)322 static void perform_stream_op_locked(void* stream_op,
323                                      grpc_error_handle /*error*/) {
324   grpc_transport_stream_op_batch* op =
325       static_cast<grpc_transport_stream_op_batch*>(stream_op);
326   grpc_binder_stream* gbs =
327       static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
328   grpc_binder_transport* gbt = gbs->t;
329   if (op->cancel_stream) {
330     // TODO(waynetu): Is this true?
331     GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
332                !op->send_trailing_metadata && !op->recv_initial_metadata &&
333                !op->recv_message && !op->recv_trailing_metadata);
334     gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
335     if (!gbs->is_client) {
336       // Send trailing metadata to inform the other end about the cancellation,
337       // regardless if we'd already done that or not.
338       grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbt->is_client);
339       cancel_tx.SetSuffix(grpc_binder::Metadata{});
340       cancel_tx.SetStatus(1);
341       absl::Status status = gbt->wire_writer->RpcCall(cancel_tx);
342     }
343     cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
344     if (op->on_complete != nullptr) {
345       grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, GRPC_ERROR_NONE);
346     }
347     GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
348     return;
349   }
350 
351   if (gbs->is_closed) {
352     if (op->send_message) {
353       // Reset the send_message payload to prevent memory leaks.
354       op->payload->send_message.send_message.reset();
355     }
356     if (op->recv_initial_metadata) {
357       grpc_core::ExecCtx::Run(
358           DEBUG_LOCATION,
359           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
360           GRPC_ERROR_REF(gbs->cancel_self_error));
361     }
362     if (op->recv_message) {
363       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
364                               op->payload->recv_message.recv_message_ready,
365                               GRPC_ERROR_REF(gbs->cancel_self_error));
366     }
367     if (op->recv_trailing_metadata) {
368       grpc_core::ExecCtx::Run(
369           DEBUG_LOCATION,
370           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
371           GRPC_ERROR_REF(gbs->cancel_self_error));
372     }
373     if (op->on_complete != nullptr) {
374       grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
375                               GRPC_ERROR_REF(gbs->cancel_self_error));
376     }
377     GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
378     return;
379   }
380 
381   int tx_code = gbs->tx_code;
382   grpc_binder::Transaction tx(tx_code, gbt->is_client);
383 
384   if (op->send_initial_metadata) {
385     gpr_log(GPR_INFO, "send_initial_metadata");
386     grpc_binder::Metadata init_md;
387     auto batch = op->payload->send_initial_metadata.send_initial_metadata;
388 
389     batch->ForEach([&](grpc_mdelem md) {
390       absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
391       absl::string_view value =
392           grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
393       gpr_log(GPR_INFO, "send initial metatday key-value %s",
394               absl::StrCat(key, " ", value).c_str());
395       if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
396         // TODO(b/192208403): Figure out if it is correct to simply drop '/'
397         // prefix and treat it as rpc method name
398         GPR_ASSERT(value[0] == '/');
399         std::string path = std::string(value).substr(1);
400 
401         // Only client send method ref.
402         GPR_ASSERT(gbt->is_client);
403         tx.SetMethodRef(path);
404       } else {
405         init_md.emplace_back(std::string(key), std::string(value));
406       }
407     });
408     tx.SetPrefix(init_md);
409   }
410   if (op->send_message) {
411     gpr_log(GPR_INFO, "send_message");
412     size_t remaining = op->payload->send_message.send_message->length();
413     std::string message_data;
414     while (remaining > 0) {
415       grpc_slice message_slice;
416       // TODO(waynetu): Temporarily assume that the message is ready.
417       GPR_ASSERT(
418           op->payload->send_message.send_message->Next(SIZE_MAX, nullptr));
419       grpc_error_handle error =
420           op->payload->send_message.send_message->Pull(&message_slice);
421       // TODO(waynetu): Cancel the stream if error is not GRPC_ERROR_NONE.
422       GPR_ASSERT(error == GRPC_ERROR_NONE);
423       uint8_t* p = GRPC_SLICE_START_PTR(message_slice);
424       size_t len = GRPC_SLICE_LENGTH(message_slice);
425       remaining -= len;
426       message_data += std::string(reinterpret_cast<char*>(p), len);
427       grpc_slice_unref_internal(message_slice);
428     }
429     gpr_log(GPR_INFO, "message_data = %s", message_data.c_str());
430     tx.SetData(message_data);
431     // TODO(b/192369787): Are we supposed to reset here to avoid
432     // use-after-free issue in call.cc?
433     op->payload->send_message.send_message.reset();
434   }
435 
436   if (op->send_trailing_metadata) {
437     gpr_log(GPR_INFO, "send_trailing_metadata");
438     auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
439     grpc_binder::Metadata trailing_metadata;
440 
441     batch->ForEach([&](grpc_mdelem md) {
442       // Client will not send trailing metadata.
443       GPR_ASSERT(!gbt->is_client);
444 
445       if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS)) {
446         int status = grpc_get_status_code_from_metadata(md);
447         gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
448         tx.SetStatus(status);
449       } else {
450         absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
451         absl::string_view value =
452             grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
453         gpr_log(GPR_INFO, "send trailing metatday key-value %s",
454                 absl::StrCat(key, " ", value).c_str());
455         trailing_metadata.emplace_back(std::string(key), std::string(value));
456       }
457     });
458     // TODO(mingcl): Will we ever has key-value pair here? According to
459     // wireformat client suffix data is always empty.
460     tx.SetSuffix(trailing_metadata);
461   }
462   if (op->recv_initial_metadata) {
463     gpr_log(GPR_INFO, "recv_initial_metadata");
464     gbs->recv_initial_metadata_ready =
465         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
466     gbs->recv_initial_metadata =
467         op->payload->recv_initial_metadata.recv_initial_metadata;
468     gbs->trailing_metadata_available =
469         op->payload->recv_initial_metadata.trailing_metadata_available;
470     GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
471     gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
472         tx_code, [tx_code, gbs,
473                   gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
474           grpc_core::ExecCtx exec_ctx;
475           gbs->recv_initial_metadata_args.tx_code = tx_code;
476           gbs->recv_initial_metadata_args.initial_metadata =
477               std::move(initial_metadata);
478           gbt->combiner->Run(
479               GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure,
480                                 recv_initial_metadata_locked,
481                                 &gbs->recv_initial_metadata_args, nullptr),
482               GRPC_ERROR_NONE);
483         });
484   }
485   if (op->recv_message) {
486     gpr_log(GPR_INFO, "recv_message");
487     gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
488     gbs->recv_message = op->payload->recv_message.recv_message;
489     gbs->call_failed_before_recv_message =
490         op->payload->recv_message.call_failed_before_recv_message;
491     GRPC_BINDER_STREAM_REF(gbs, "recv_message");
492     gbt->transport_stream_receiver->RegisterRecvMessage(
493         tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
494           grpc_core::ExecCtx exec_ctx;
495           gbs->recv_message_args.tx_code = tx_code;
496           gbs->recv_message_args.message = std::move(message);
497           gbt->combiner->Run(
498               GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked,
499                                 &gbs->recv_message_args, nullptr),
500               GRPC_ERROR_NONE);
501         });
502   }
503   if (op->recv_trailing_metadata) {
504     gpr_log(GPR_INFO, "recv_trailing_metadata");
505     gbs->recv_trailing_metadata_finished =
506         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
507     gbs->recv_trailing_metadata =
508         op->payload->recv_trailing_metadata.recv_trailing_metadata;
509     GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
510     gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
511         tx_code, [tx_code, gbs, gbt](
512                      absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
513                      int status) {
514           grpc_core::ExecCtx exec_ctx;
515           gbs->recv_trailing_metadata_args.tx_code = tx_code;
516           gbs->recv_trailing_metadata_args.trailing_metadata =
517               std::move(trailing_metadata);
518           gbs->recv_trailing_metadata_args.status = status;
519           gbt->combiner->Run(
520               GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure,
521                                 recv_trailing_metadata_locked,
522                                 &gbs->recv_trailing_metadata_args, nullptr),
523               GRPC_ERROR_NONE);
524         });
525   }
526   // Only send transaction when there's a send op presented.
527   absl::Status status = absl::OkStatus();
528   if (op->send_initial_metadata || op->send_message ||
529       op->send_trailing_metadata) {
530     // TODO(waynetu): RpcCall() is doing a lot of work (including waiting for
531     // acknowledgements from the other side). Consider delaying this operation
532     // with combiner.
533     status = gbt->wire_writer->RpcCall(tx);
534     if (!gbs->is_client && op->send_trailing_metadata) {
535       gbs->trailing_metadata_sent = true;
536       // According to transport explaineer - "Server extra: This op shouldn't
537       // actually be considered complete until the server has also sent trailing
538       // metadata to provide the other side with final status"
539       //
540       // Because we've done sending trailing metadata here, we can safely
541       // complete the recv_trailing_metadata callback here.
542       if (gbs->need_to_call_trailing_metadata_callback) {
543         grpc_closure* cb = gbs->recv_trailing_metadata_finished;
544         gbs->recv_trailing_metadata_finished = nullptr;
545         grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
546         gbs->need_to_call_trailing_metadata_callback = false;
547       }
548     }
549   }
550   // Note that this should only be scheduled when all non-recv ops are
551   // completed
552   if (op->on_complete != nullptr) {
553     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
554                             absl_status_to_grpc_error(status));
555     gpr_log(GPR_INFO, "on_complete closure schuduled");
556   }
557   GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
558 }
559 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)560 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
561                               grpc_transport_stream_op_batch* op) {
562   GPR_TIMER_SCOPE("perform_stream_op", 0);
563   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
564   grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
565   gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, gt, gs, op,
566           gbs->is_client);
567   GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
568   op->handler_private.extra_arg = gbs;
569   gbt->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
570                                        perform_stream_op_locked, op, nullptr),
571                      GRPC_ERROR_NONE);
572 }
573 
close_transport_locked(grpc_binder_transport * gbt)574 static void close_transport_locked(grpc_binder_transport* gbt) {
575   gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
576                               "transport closed due to disconnection/goaway");
577   while (!gbt->registered_stream.empty()) {
578     cancel_stream_locked(
579         gbt, gbt->registered_stream.begin()->second,
580         grpc_error_set_int(
581             GRPC_ERROR_CREATE_FROM_STATIC_STRING("transport closed"),
582             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
583   }
584 }
585 
perform_transport_op_locked(void * transport_op,grpc_error_handle)586 static void perform_transport_op_locked(void* transport_op,
587                                         grpc_error_handle /*error*/) {
588   grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
589   grpc_binder_transport* gbt =
590       static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
591   // TODO(waynetu): Should we lock here to avoid data race?
592   if (op->start_connectivity_watch != nullptr) {
593     gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
594                                   std::move(op->start_connectivity_watch));
595   }
596   if (op->stop_connectivity_watch != nullptr) {
597     gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
598   }
599   if (op->set_accept_stream) {
600     gbt->accept_stream_fn = op->set_accept_stream_fn;
601     gbt->accept_stream_user_data = op->set_accept_stream_user_data;
602   }
603   if (op->on_consumed) {
604     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
605   }
606   bool do_close = false;
607   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
608     do_close = true;
609     GRPC_ERROR_UNREF(op->disconnect_with_error);
610   }
611   if (op->goaway_error != GRPC_ERROR_NONE) {
612     do_close = true;
613     GRPC_ERROR_UNREF(op->goaway_error);
614   }
615   if (do_close) {
616     close_transport_locked(gbt);
617   }
618   GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
619 }
620 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)621 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
622   gpr_log(GPR_INFO, __func__);
623   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
624   op->handler_private.extra_arg = gbt;
625   GRPC_BINDER_REF_TRANSPORT(gbt, "perform_transport_op");
626   gbt->combiner->Run(
627       GRPC_CLOSURE_INIT(&op->handler_private.closure,
628                         perform_transport_op_locked, op, nullptr),
629       GRPC_ERROR_NONE);
630 }
631 
destroy_stream_locked(void * sp,grpc_error_handle)632 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
633   grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
634   grpc_binder_transport* gbt = gbs->t;
635   cancel_stream_locked(
636       gbt, gbs,
637       grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("destroy stream"),
638                          GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
639   gbs->~grpc_binder_stream();
640 }
641 
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)642 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
643                            grpc_closure* then_schedule_closure) {
644   gpr_log(GPR_INFO, __func__);
645   grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
646   gbs->destroy_stream_then_closure = then_schedule_closure;
647   gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream,
648                                           destroy_stream_locked, gbs, nullptr),
649                         GRPC_ERROR_NONE);
650 }
651 
destroy_transport_locked(void * gt,grpc_error_handle)652 static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
653   grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
654   close_transport_locked(gbt);
655   // Release the references held by the transport.
656   gbt->wire_reader = nullptr;
657   gbt->transport_stream_receiver = nullptr;
658   gbt->wire_writer = nullptr;
659   GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
660 }
661 
destroy_transport(grpc_transport * gt)662 static void destroy_transport(grpc_transport* gt) {
663   gpr_log(GPR_INFO, __func__);
664   grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
665   gbt->combiner->Run(
666       GRPC_CLOSURE_CREATE(destroy_transport_locked, gbt, nullptr),
667       GRPC_ERROR_NONE);
668 }
669 
get_endpoint(grpc_transport *)670 static grpc_endpoint* get_endpoint(grpc_transport*) {
671   gpr_log(GPR_INFO, __func__);
672   return nullptr;
673 }
674 
675 // See grpc_transport_vtable declaration for meaning of each field
676 static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
677                                              "binder",
678                                              init_stream,
679                                              set_pollset,
680                                              set_pollset_set,
681                                              perform_stream_op,
682                                              perform_transport_op,
683                                              destroy_stream,
684                                              destroy_transport,
685                                              get_endpoint};
686 
get_vtable()687 static const grpc_transport_vtable* get_vtable() { return &vtable; }
688 
accept_stream_locked(void * gt,grpc_error_handle)689 static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
690   grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
691   if (gbt->accept_stream_fn) {
692     // must pass in a non-null value.
693     (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
694   }
695 }
696 
grpc_binder_transport(std::unique_ptr<grpc_binder::Binder> binder,bool is_client,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)697 grpc_binder_transport::grpc_binder_transport(
698     std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
699     std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
700     : is_client(is_client),
701       combiner(grpc_combiner_create()),
702       state_tracker(is_client ? "binder_transport_client"
703                               : "binder_transport_server"),
704       refs(1, nullptr) {
705   gpr_log(GPR_INFO, __func__);
706   base.vtable = get_vtable();
707   GRPC_CLOSURE_INIT(&accept_stream_closure, accept_stream_locked, this,
708                     nullptr);
709   transport_stream_receiver =
710       std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
711           is_client, /*accept_stream_callback=*/[this] {
712             grpc_core::ExecCtx exec_ctx;
713             combiner->Run(&accept_stream_closure, GRPC_ERROR_NONE);
714           });
715   // WireReader holds a ref to grpc_binder_transport.
716   GRPC_BINDER_REF_TRANSPORT(this, "wire reader");
717   wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
718       transport_stream_receiver, is_client, security_policy,
719       /*on_destruct_callback=*/
720       [this] {
721         // Unref transport when destructed.
722         GRPC_BINDER_UNREF_TRANSPORT(this, "wire reader");
723       });
724   wire_writer = wire_reader->SetupTransport(std::move(binder));
725 }
726 
~grpc_binder_transport()727 grpc_binder_transport::~grpc_binder_transport() {
728   GRPC_COMBINER_UNREF(combiner, "binder_transport");
729 }
730 
grpc_create_binder_transport_client(std::unique_ptr<grpc_binder::Binder> endpoint_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)731 grpc_transport* grpc_create_binder_transport_client(
732     std::unique_ptr<grpc_binder::Binder> endpoint_binder,
733     std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
734         security_policy) {
735   gpr_log(GPR_INFO, __func__);
736 
737   GPR_ASSERT(endpoint_binder != nullptr);
738   GPR_ASSERT(security_policy != nullptr);
739 
740   grpc_binder_transport* t = new grpc_binder_transport(
741       std::move(endpoint_binder), /*is_client=*/true, security_policy);
742 
743   return &t->base;
744 }
745 
grpc_create_binder_transport_server(std::unique_ptr<grpc_binder::Binder> client_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)746 grpc_transport* grpc_create_binder_transport_server(
747     std::unique_ptr<grpc_binder::Binder> client_binder,
748     std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
749         security_policy) {
750   gpr_log(GPR_INFO, __func__);
751 
752   GPR_ASSERT(client_binder != nullptr);
753   GPR_ASSERT(security_policy != nullptr);
754 
755   grpc_binder_transport* t = new grpc_binder_transport(
756       std::move(client_binder), /*is_client=*/false, security_policy);
757 
758   return &t->base;
759 }
760 #endif
761