1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/binder/transport/binder_transport.h"
18
19 #ifndef GRPC_NO_BINDER
20
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <utility>
25
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
29
30 #include <grpc/support/log.h>
31
32 #include "src/core/ext/transport/binder/transport/binder_stream.h"
33 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
34 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
35 #include "src/core/ext/transport/binder/wire_format/wire_reader.h"
36 #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
37 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice_utils.h"
40 #include "src/core/lib/transport/byte_stream.h"
41 #include "src/core/lib/transport/error_utils.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/transport/transport.h"
46
47 #ifndef NDEBUG
grpc_binder_stream_ref(grpc_binder_stream * s,const char * reason)48 static void grpc_binder_stream_ref(grpc_binder_stream* s, const char* reason) {
49 grpc_stream_ref(s->refcount, reason);
50 }
grpc_binder_stream_unref(grpc_binder_stream * s,const char * reason)51 static void grpc_binder_stream_unref(grpc_binder_stream* s,
52 const char* reason) {
53 grpc_stream_unref(s->refcount, reason);
54 }
grpc_binder_ref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)55 static void grpc_binder_ref_transport(grpc_binder_transport* t,
56 const char* reason, const char* file,
57 int line) {
58 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
59 }
grpc_binder_unref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)60 static void grpc_binder_unref_transport(grpc_binder_transport* t,
61 const char* reason, const char* file,
62 int line) {
63 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
64 delete t;
65 }
66 }
67 #else
grpc_binder_stream_ref(grpc_binder_stream * s)68 static void grpc_binder_stream_ref(grpc_binder_stream* s) {
69 grpc_stream_ref(s->refcount);
70 }
grpc_binder_stream_unref(grpc_binder_stream * s)71 static void grpc_binder_stream_unref(grpc_binder_stream* s) {
72 grpc_stream_unref(s->refcount);
73 }
grpc_binder_ref_transport(grpc_binder_transport * t)74 static void grpc_binder_ref_transport(grpc_binder_transport* t) {
75 t->refs.Ref();
76 }
grpc_binder_unref_transport(grpc_binder_transport * t)77 static void grpc_binder_unref_transport(grpc_binder_transport* t) {
78 if (t->refs.Unref()) {
79 delete t;
80 }
81 }
82 #endif
83
84 #ifndef NDEBUG
85 #define GRPC_BINDER_STREAM_REF(stream, reason) \
86 grpc_binder_stream_ref(stream, reason)
87 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
88 grpc_binder_stream_unref(stream, reason)
89 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
90 grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
91 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
92 grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
93 #else
94 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
95 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
96 grpc_binder_stream_unref(stream)
97 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
98 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
99 #endif
100
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)101 static int init_stream(grpc_transport* gt, grpc_stream* gs,
102 grpc_stream_refcount* refcount, const void* server_data,
103 grpc_core::Arena* arena) {
104 GPR_TIMER_SCOPE("init_stream", 0);
105 gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
106 server_data, arena);
107 grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
108 // TODO(mingcl): Figure out if we need to worry about concurrent invocation
109 // here
110 new (gs) grpc_binder_stream(t, refcount, server_data, arena,
111 t->NewStreamTxCode(), t->is_client);
112 return 0;
113 }
114
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * gp)115 static void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* gp) {
116 gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
117 }
118
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)119 static void set_pollset_set(grpc_transport*, grpc_stream*, grpc_pollset_set*) {
120 gpr_log(GPR_INFO, __func__);
121 }
122
AssignMetadata(grpc_metadata_batch * mb,const grpc_binder::Metadata & md)123 static void AssignMetadata(grpc_metadata_batch* mb,
124 const grpc_binder::Metadata& md) {
125 mb->Clear();
126 for (auto& p : md) {
127 mb->Append(p.first, grpc_core::Slice::FromCopiedString(p.second));
128 }
129 }
130
cancel_stream_locked(grpc_binder_transport * gbt,grpc_binder_stream * gbs,grpc_error_handle error)131 static void cancel_stream_locked(grpc_binder_transport* gbt,
132 grpc_binder_stream* gbs,
133 grpc_error_handle error) {
134 gpr_log(GPR_INFO, "cancel_stream_locked");
135 if (!gbs->is_closed) {
136 GPR_ASSERT(gbs->cancel_self_error == GRPC_ERROR_NONE);
137 gbs->is_closed = true;
138 gbs->cancel_self_error = GRPC_ERROR_REF(error);
139 gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
140 gbt->registered_stream.erase(gbs->tx_code);
141 if (gbs->recv_initial_metadata_ready != nullptr) {
142 grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
143 GRPC_ERROR_REF(error));
144 gbs->recv_initial_metadata_ready = nullptr;
145 gbs->recv_initial_metadata = nullptr;
146 gbs->trailing_metadata_available = nullptr;
147 }
148 if (gbs->recv_message_ready != nullptr) {
149 grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
150 GRPC_ERROR_REF(error));
151 gbs->recv_message_ready = nullptr;
152 gbs->recv_message->reset();
153 gbs->recv_message = nullptr;
154 gbs->call_failed_before_recv_message = nullptr;
155 }
156 if (gbs->recv_trailing_metadata_finished != nullptr) {
157 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
158 gbs->recv_trailing_metadata_finished,
159 GRPC_ERROR_REF(error));
160 gbs->recv_trailing_metadata_finished = nullptr;
161 gbs->recv_trailing_metadata = nullptr;
162 }
163 }
164 GRPC_ERROR_UNREF(error);
165 }
166
ContainsAuthorityAndPath(const grpc_binder::Metadata & metadata)167 static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) {
168 bool has_authority = false;
169 bool has_path = false;
170 for (const auto& kv : metadata) {
171 if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_AUTHORITY)) {
172 has_authority = true;
173 }
174 if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_PATH)) {
175 has_path = true;
176 }
177 }
178 return has_authority && has_path;
179 }
180
recv_initial_metadata_locked(void * arg,grpc_error_handle)181 static void recv_initial_metadata_locked(void* arg,
182 grpc_error_handle /*error*/) {
183 RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg);
184 grpc_binder_stream* gbs = args->gbs;
185
186 gpr_log(GPR_INFO,
187 "recv_initial_metadata_locked is_client = %d is_closed = %d",
188 gbs->is_client, gbs->is_closed);
189
190 if (!gbs->is_closed) {
191 grpc_error_handle error = [&] {
192 GPR_ASSERT(gbs->recv_initial_metadata);
193 GPR_ASSERT(gbs->recv_initial_metadata_ready);
194 if (!args->initial_metadata.ok()) {
195 gpr_log(GPR_ERROR, "Failed to parse initial metadata");
196 return absl_status_to_grpc_error(args->initial_metadata.status());
197 }
198 if (!gbs->is_client) {
199 // For server, we expect :authority and :path in initial metadata.
200 if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
201 return GRPC_ERROR_CREATE_FROM_CPP_STRING(
202 "Missing :authority or :path in initial metadata");
203 }
204 }
205 AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
206 return GRPC_ERROR_NONE;
207 }();
208
209 grpc_closure* cb = gbs->recv_initial_metadata_ready;
210 gbs->recv_initial_metadata_ready = nullptr;
211 gbs->recv_initial_metadata = nullptr;
212 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
213 }
214 GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
215 }
216
recv_message_locked(void * arg,grpc_error_handle)217 static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
218 RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
219 grpc_binder_stream* gbs = args->gbs;
220
221 gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
222 gbs->is_client, gbs->is_closed);
223
224 if (!gbs->is_closed) {
225 grpc_error_handle error = [&] {
226 GPR_ASSERT(gbs->recv_message);
227 GPR_ASSERT(gbs->recv_message_ready);
228 if (!args->message.ok()) {
229 gpr_log(GPR_ERROR, "Failed to receive message");
230 if (args->message.status().message() ==
231 grpc_binder::TransportStreamReceiver::
232 kGrpcBinderTransportCancelledGracefully) {
233 gpr_log(GPR_ERROR, "message cancelled gracefully");
234 // Cancelled because we've already received trailing metadata.
235 // It's not an error in this case.
236 return GRPC_ERROR_NONE;
237 } else {
238 return absl_status_to_grpc_error(args->message.status());
239 }
240 }
241 grpc_slice_buffer buf;
242 grpc_slice_buffer_init(&buf);
243 grpc_slice_buffer_add(&buf, grpc_slice_from_cpp_string(*args->message));
244
245 gbs->sbs.Init(&buf, 0);
246 gbs->recv_message->reset(gbs->sbs.get());
247 return GRPC_ERROR_NONE;
248 }();
249
250 if (error != GRPC_ERROR_NONE &&
251 gbs->call_failed_before_recv_message != nullptr) {
252 *gbs->call_failed_before_recv_message = true;
253 }
254 grpc_closure* cb = gbs->recv_message_ready;
255 gbs->recv_message_ready = nullptr;
256 gbs->recv_message = nullptr;
257 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
258 }
259
260 GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
261 }
262
recv_trailing_metadata_locked(void * arg,grpc_error_handle)263 static void recv_trailing_metadata_locked(void* arg,
264 grpc_error_handle /*error*/) {
265 RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg);
266 grpc_binder_stream* gbs = args->gbs;
267
268 gpr_log(GPR_INFO,
269 "recv_trailing_metadata_locked is_client = %d is_closed = %d",
270 gbs->is_client, gbs->is_closed);
271
272 if (!gbs->is_closed) {
273 grpc_error_handle error = [&] {
274 GPR_ASSERT(gbs->recv_trailing_metadata);
275 GPR_ASSERT(gbs->recv_trailing_metadata_finished);
276 if (!args->trailing_metadata.ok()) {
277 gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
278 return absl_status_to_grpc_error(args->trailing_metadata.status());
279 }
280 if (!gbs->is_client) {
281 // Client will not send non-empty trailing metadata.
282 if (!args->trailing_metadata.value().empty()) {
283 gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
284 return GRPC_ERROR_CANCELLED;
285 }
286 } else {
287 AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
288 // Append status to metadata
289 // TODO(b/192208695): See if we can avoid to manually put status
290 // code into the header
291 gpr_log(GPR_INFO, "status = %d", args->status);
292 grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
293 gbs->arena->Alloc(sizeof(grpc_linked_mdelem)));
294 glm->md = grpc_get_reffed_status_elem(args->status);
295 GPR_ASSERT(gbs->recv_trailing_metadata->LinkTail(glm) ==
296 GRPC_ERROR_NONE);
297 gpr_log(GPR_INFO, "trailing_metadata = %p",
298 gbs->recv_trailing_metadata);
299 gpr_log(GPR_INFO, "glm = %p", glm);
300 }
301 return GRPC_ERROR_NONE;
302 }();
303
304 if (gbs->is_client || gbs->trailing_metadata_sent) {
305 grpc_closure* cb = gbs->recv_trailing_metadata_finished;
306 gbs->recv_trailing_metadata_finished = nullptr;
307 gbs->recv_trailing_metadata = nullptr;
308 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
309 } else {
310 // According to transport explaineer - "Server extra: This op shouldn't
311 // actually be considered complete until the server has also sent trailing
312 // metadata to provide the other side with final status"
313 //
314 // We haven't sent trailing metadata yet, so we have to delay completing
315 // the recv_trailing_metadata callback.
316 gbs->need_to_call_trailing_metadata_callback = true;
317 }
318 }
319 GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
320 }
321
322 namespace grpc_binder {
323 namespace {
324
325 class MetadataEncoder {
326 public:
MetadataEncoder(bool is_client,Transaction * tx,Metadata * init_md)327 MetadataEncoder(bool is_client, Transaction* tx, Metadata* init_md)
328 : is_client_(is_client), tx_(tx), init_md_(init_md) {}
329
Encode(grpc_mdelem md)330 void Encode(grpc_mdelem md) {
331 absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
332 absl::string_view value = grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
333 gpr_log(GPR_INFO, "send metadata key-value %s",
334 absl::StrCat(key, " ", value).c_str());
335 if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
336 // TODO(b/192208403): Figure out if it is correct to simply drop '/'
337 // prefix and treat it as rpc method name
338 GPR_ASSERT(value[0] == '/');
339 std::string path = std::string(value).substr(1);
340
341 // Only client send method ref.
342 GPR_ASSERT(is_client_);
343 tx_->SetMethodRef(path);
344 } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS)) {
345 int status = grpc_get_status_code_from_metadata(md);
346 gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
347 tx_->SetStatus(status);
348 } else {
349 init_md_->emplace_back(std::string(key), std::string(value));
350 }
351 }
352
353 template <typename Trait>
Encode(Trait,const typename Trait::ValueType & value)354 void Encode(Trait, const typename Trait::ValueType& value) {
355 init_md_->emplace_back(std::string(Trait::key()),
356 std::string(Trait::Encode(value).as_string_view()));
357 }
358
359 private:
360 const bool is_client_;
361 Transaction* const tx_;
362 Metadata* const init_md_;
363 };
364
365 } // namespace
366 } // namespace grpc_binder
367
perform_stream_op_locked(void * stream_op,grpc_error_handle)368 static void perform_stream_op_locked(void* stream_op,
369 grpc_error_handle /*error*/) {
370 grpc_transport_stream_op_batch* op =
371 static_cast<grpc_transport_stream_op_batch*>(stream_op);
372 grpc_binder_stream* gbs =
373 static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
374 grpc_binder_transport* gbt = gbs->t;
375 if (op->cancel_stream) {
376 // TODO(waynetu): Is this true?
377 GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
378 !op->send_trailing_metadata && !op->recv_initial_metadata &&
379 !op->recv_message && !op->recv_trailing_metadata);
380 gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
381 if (!gbs->is_client) {
382 // Send trailing metadata to inform the other end about the cancellation,
383 // regardless if we'd already done that or not.
384 grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbt->is_client);
385 cancel_tx.SetSuffix(grpc_binder::Metadata{});
386 cancel_tx.SetStatus(1);
387 absl::Status status = gbt->wire_writer->RpcCall(cancel_tx);
388 }
389 cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
390 if (op->on_complete != nullptr) {
391 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, GRPC_ERROR_NONE);
392 }
393 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
394 return;
395 }
396
397 if (gbs->is_closed) {
398 if (op->send_message) {
399 // Reset the send_message payload to prevent memory leaks.
400 op->payload->send_message.send_message.reset();
401 }
402 if (op->recv_initial_metadata) {
403 grpc_core::ExecCtx::Run(
404 DEBUG_LOCATION,
405 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
406 GRPC_ERROR_REF(gbs->cancel_self_error));
407 }
408 if (op->recv_message) {
409 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
410 op->payload->recv_message.recv_message_ready,
411 GRPC_ERROR_REF(gbs->cancel_self_error));
412 }
413 if (op->recv_trailing_metadata) {
414 grpc_core::ExecCtx::Run(
415 DEBUG_LOCATION,
416 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
417 GRPC_ERROR_REF(gbs->cancel_self_error));
418 }
419 if (op->on_complete != nullptr) {
420 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
421 GRPC_ERROR_REF(gbs->cancel_self_error));
422 }
423 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
424 return;
425 }
426
427 int tx_code = gbs->tx_code;
428 grpc_binder::Transaction tx(tx_code, gbt->is_client);
429
430 if (op->send_initial_metadata) {
431 gpr_log(GPR_INFO, "send_initial_metadata");
432 grpc_binder::Metadata init_md;
433 auto batch = op->payload->send_initial_metadata.send_initial_metadata;
434
435 grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx, &init_md);
436 batch->Encode(&encoder);
437 tx.SetPrefix(init_md);
438 }
439 if (op->send_message) {
440 gpr_log(GPR_INFO, "send_message");
441 size_t remaining = op->payload->send_message.send_message->length();
442 std::string message_data;
443 while (remaining > 0) {
444 grpc_slice message_slice;
445 // TODO(waynetu): Temporarily assume that the message is ready.
446 GPR_ASSERT(
447 op->payload->send_message.send_message->Next(SIZE_MAX, nullptr));
448 grpc_error_handle error =
449 op->payload->send_message.send_message->Pull(&message_slice);
450 // TODO(waynetu): Cancel the stream if error is not GRPC_ERROR_NONE.
451 GPR_ASSERT(error == GRPC_ERROR_NONE);
452 uint8_t* p = GRPC_SLICE_START_PTR(message_slice);
453 size_t len = GRPC_SLICE_LENGTH(message_slice);
454 remaining -= len;
455 message_data += std::string(reinterpret_cast<char*>(p), len);
456 grpc_slice_unref_internal(message_slice);
457 }
458 gpr_log(GPR_INFO, "message_data = %s", message_data.c_str());
459 tx.SetData(message_data);
460 // TODO(b/192369787): Are we supposed to reset here to avoid
461 // use-after-free issue in call.cc?
462 op->payload->send_message.send_message.reset();
463 }
464
465 if (op->send_trailing_metadata) {
466 gpr_log(GPR_INFO, "send_trailing_metadata");
467 auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
468 grpc_binder::Metadata trailing_metadata;
469
470 grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx,
471 &trailing_metadata);
472 batch->Encode(&encoder);
473
474 // TODO(mingcl): Will we ever has key-value pair here? According to
475 // wireformat client suffix data is always empty.
476 tx.SetSuffix(trailing_metadata);
477 }
478 if (op->recv_initial_metadata) {
479 gpr_log(GPR_INFO, "recv_initial_metadata");
480 gbs->recv_initial_metadata_ready =
481 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
482 gbs->recv_initial_metadata =
483 op->payload->recv_initial_metadata.recv_initial_metadata;
484 gbs->trailing_metadata_available =
485 op->payload->recv_initial_metadata.trailing_metadata_available;
486 GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
487 gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
488 tx_code, [tx_code, gbs,
489 gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
490 grpc_core::ExecCtx exec_ctx;
491 gbs->recv_initial_metadata_args.tx_code = tx_code;
492 gbs->recv_initial_metadata_args.initial_metadata =
493 std::move(initial_metadata);
494 gbt->combiner->Run(
495 GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure,
496 recv_initial_metadata_locked,
497 &gbs->recv_initial_metadata_args, nullptr),
498 GRPC_ERROR_NONE);
499 });
500 }
501 if (op->recv_message) {
502 gpr_log(GPR_INFO, "recv_message");
503 gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
504 gbs->recv_message = op->payload->recv_message.recv_message;
505 gbs->call_failed_before_recv_message =
506 op->payload->recv_message.call_failed_before_recv_message;
507 GRPC_BINDER_STREAM_REF(gbs, "recv_message");
508 gbt->transport_stream_receiver->RegisterRecvMessage(
509 tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
510 grpc_core::ExecCtx exec_ctx;
511 gbs->recv_message_args.tx_code = tx_code;
512 gbs->recv_message_args.message = std::move(message);
513 gbt->combiner->Run(
514 GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked,
515 &gbs->recv_message_args, nullptr),
516 GRPC_ERROR_NONE);
517 });
518 }
519 if (op->recv_trailing_metadata) {
520 gpr_log(GPR_INFO, "recv_trailing_metadata");
521 gbs->recv_trailing_metadata_finished =
522 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
523 gbs->recv_trailing_metadata =
524 op->payload->recv_trailing_metadata.recv_trailing_metadata;
525 GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
526 gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
527 tx_code, [tx_code, gbs, gbt](
528 absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
529 int status) {
530 grpc_core::ExecCtx exec_ctx;
531 gbs->recv_trailing_metadata_args.tx_code = tx_code;
532 gbs->recv_trailing_metadata_args.trailing_metadata =
533 std::move(trailing_metadata);
534 gbs->recv_trailing_metadata_args.status = status;
535 gbt->combiner->Run(
536 GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure,
537 recv_trailing_metadata_locked,
538 &gbs->recv_trailing_metadata_args, nullptr),
539 GRPC_ERROR_NONE);
540 });
541 }
542 // Only send transaction when there's a send op presented.
543 absl::Status status = absl::OkStatus();
544 if (op->send_initial_metadata || op->send_message ||
545 op->send_trailing_metadata) {
546 // TODO(waynetu): RpcCall() is doing a lot of work (including waiting for
547 // acknowledgements from the other side). Consider delaying this operation
548 // with combiner.
549 status = gbt->wire_writer->RpcCall(tx);
550 if (!gbs->is_client && op->send_trailing_metadata) {
551 gbs->trailing_metadata_sent = true;
552 // According to transport explaineer - "Server extra: This op shouldn't
553 // actually be considered complete until the server has also sent trailing
554 // metadata to provide the other side with final status"
555 //
556 // Because we've done sending trailing metadata here, we can safely
557 // complete the recv_trailing_metadata callback here.
558 if (gbs->need_to_call_trailing_metadata_callback) {
559 grpc_closure* cb = gbs->recv_trailing_metadata_finished;
560 gbs->recv_trailing_metadata_finished = nullptr;
561 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
562 gbs->need_to_call_trailing_metadata_callback = false;
563 }
564 }
565 }
566 // Note that this should only be scheduled when all non-recv ops are
567 // completed
568 if (op->on_complete != nullptr) {
569 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
570 absl_status_to_grpc_error(status));
571 gpr_log(GPR_INFO, "on_complete closure schuduled");
572 }
573 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
574 }
575
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)576 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
577 grpc_transport_stream_op_batch* op) {
578 GPR_TIMER_SCOPE("perform_stream_op", 0);
579 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
580 grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
581 gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, gt, gs, op,
582 gbs->is_client);
583 GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
584 op->handler_private.extra_arg = gbs;
585 gbt->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
586 perform_stream_op_locked, op, nullptr),
587 GRPC_ERROR_NONE);
588 }
589
close_transport_locked(grpc_binder_transport * gbt)590 static void close_transport_locked(grpc_binder_transport* gbt) {
591 gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
592 "transport closed due to disconnection/goaway");
593 while (!gbt->registered_stream.empty()) {
594 cancel_stream_locked(
595 gbt, gbt->registered_stream.begin()->second,
596 grpc_error_set_int(
597 GRPC_ERROR_CREATE_FROM_STATIC_STRING("transport closed"),
598 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
599 }
600 }
601
perform_transport_op_locked(void * transport_op,grpc_error_handle)602 static void perform_transport_op_locked(void* transport_op,
603 grpc_error_handle /*error*/) {
604 grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
605 grpc_binder_transport* gbt =
606 static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
607 // TODO(waynetu): Should we lock here to avoid data race?
608 if (op->start_connectivity_watch != nullptr) {
609 gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
610 std::move(op->start_connectivity_watch));
611 }
612 if (op->stop_connectivity_watch != nullptr) {
613 gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
614 }
615 if (op->set_accept_stream) {
616 gbt->accept_stream_fn = op->set_accept_stream_fn;
617 gbt->accept_stream_user_data = op->set_accept_stream_user_data;
618 }
619 if (op->on_consumed) {
620 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
621 }
622 bool do_close = false;
623 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
624 do_close = true;
625 GRPC_ERROR_UNREF(op->disconnect_with_error);
626 }
627 if (op->goaway_error != GRPC_ERROR_NONE) {
628 do_close = true;
629 GRPC_ERROR_UNREF(op->goaway_error);
630 }
631 if (do_close) {
632 close_transport_locked(gbt);
633 }
634 GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
635 }
636
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)637 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
638 gpr_log(GPR_INFO, __func__);
639 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
640 op->handler_private.extra_arg = gbt;
641 GRPC_BINDER_REF_TRANSPORT(gbt, "perform_transport_op");
642 gbt->combiner->Run(
643 GRPC_CLOSURE_INIT(&op->handler_private.closure,
644 perform_transport_op_locked, op, nullptr),
645 GRPC_ERROR_NONE);
646 }
647
destroy_stream_locked(void * sp,grpc_error_handle)648 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
649 grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
650 grpc_binder_transport* gbt = gbs->t;
651 cancel_stream_locked(
652 gbt, gbs,
653 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("destroy stream"),
654 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
655 gbs->~grpc_binder_stream();
656 }
657
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)658 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
659 grpc_closure* then_schedule_closure) {
660 gpr_log(GPR_INFO, __func__);
661 grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
662 gbs->destroy_stream_then_closure = then_schedule_closure;
663 gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream,
664 destroy_stream_locked, gbs, nullptr),
665 GRPC_ERROR_NONE);
666 }
667
destroy_transport_locked(void * gt,grpc_error_handle)668 static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
669 grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
670 close_transport_locked(gbt);
671 // Release the references held by the transport.
672 gbt->wire_reader = nullptr;
673 gbt->transport_stream_receiver = nullptr;
674 gbt->wire_writer = nullptr;
675 GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
676 }
677
destroy_transport(grpc_transport * gt)678 static void destroy_transport(grpc_transport* gt) {
679 gpr_log(GPR_INFO, __func__);
680 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
681 gbt->combiner->Run(
682 GRPC_CLOSURE_CREATE(destroy_transport_locked, gbt, nullptr),
683 GRPC_ERROR_NONE);
684 }
685
get_endpoint(grpc_transport *)686 static grpc_endpoint* get_endpoint(grpc_transport*) {
687 gpr_log(GPR_INFO, __func__);
688 return nullptr;
689 }
690
691 // See grpc_transport_vtable declaration for meaning of each field
692 static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
693 "binder",
694 init_stream,
695 set_pollset,
696 set_pollset_set,
697 perform_stream_op,
698 perform_transport_op,
699 destroy_stream,
700 destroy_transport,
701 get_endpoint};
702
get_vtable()703 static const grpc_transport_vtable* get_vtable() { return &vtable; }
704
accept_stream_locked(void * gt,grpc_error_handle)705 static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
706 grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
707 if (gbt->accept_stream_fn) {
708 // must pass in a non-null value.
709 (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
710 }
711 }
712
grpc_binder_transport(std::unique_ptr<grpc_binder::Binder> binder,bool is_client,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)713 grpc_binder_transport::grpc_binder_transport(
714 std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
715 std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
716 : is_client(is_client),
717 combiner(grpc_combiner_create()),
718 state_tracker(
719 is_client ? "binder_transport_client" : "binder_transport_server",
720 GRPC_CHANNEL_READY),
721 refs(1, nullptr) {
722 gpr_log(GPR_INFO, __func__);
723 base.vtable = get_vtable();
724 GRPC_CLOSURE_INIT(&accept_stream_closure, accept_stream_locked, this,
725 nullptr);
726 transport_stream_receiver =
727 std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
728 is_client, /*accept_stream_callback=*/[this] {
729 grpc_core::ExecCtx exec_ctx;
730 combiner->Run(&accept_stream_closure, GRPC_ERROR_NONE);
731 });
732 // WireReader holds a ref to grpc_binder_transport.
733 GRPC_BINDER_REF_TRANSPORT(this, "wire reader");
734 wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
735 transport_stream_receiver, is_client, security_policy,
736 /*on_destruct_callback=*/
737 [this] {
738 // Unref transport when destructed.
739 GRPC_BINDER_UNREF_TRANSPORT(this, "wire reader");
740 });
741 wire_writer = wire_reader->SetupTransport(std::move(binder));
742 }
743
~grpc_binder_transport()744 grpc_binder_transport::~grpc_binder_transport() {
745 GRPC_COMBINER_UNREF(combiner, "binder_transport");
746 }
747
grpc_create_binder_transport_client(std::unique_ptr<grpc_binder::Binder> endpoint_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)748 grpc_transport* grpc_create_binder_transport_client(
749 std::unique_ptr<grpc_binder::Binder> endpoint_binder,
750 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
751 security_policy) {
752 gpr_log(GPR_INFO, __func__);
753
754 GPR_ASSERT(endpoint_binder != nullptr);
755 GPR_ASSERT(security_policy != nullptr);
756
757 grpc_binder_transport* t = new grpc_binder_transport(
758 std::move(endpoint_binder), /*is_client=*/true, security_policy);
759
760 return &t->base;
761 }
762
grpc_create_binder_transport_server(std::unique_ptr<grpc_binder::Binder> client_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)763 grpc_transport* grpc_create_binder_transport_server(
764 std::unique_ptr<grpc_binder::Binder> client_binder,
765 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
766 security_policy) {
767 gpr_log(GPR_INFO, __func__);
768
769 GPR_ASSERT(client_binder != nullptr);
770 GPR_ASSERT(security_policy != nullptr);
771
772 grpc_binder_transport* t = new grpc_binder_transport(
773 std::move(client_binder), /*is_client=*/false, security_policy);
774
775 return &t->base;
776 }
777 #endif
778