1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/binder/transport/binder_transport.h"
18
19 #ifndef GRPC_NO_BINDER
20
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <utility>
25
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/substitute.h"
29
30 #include <grpc/support/log.h>
31
32 #include "src/core/ext/transport/binder/transport/binder_stream.h"
33 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
34 #include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
35 #include "src/core/ext/transport/binder/wire_format/wire_reader.h"
36 #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
37 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice_utils.h"
40 #include "src/core/lib/transport/byte_stream.h"
41 #include "src/core/lib/transport/error_utils.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/transport/transport.h"
46
47 #ifndef NDEBUG
grpc_binder_stream_ref(grpc_binder_stream * s,const char * reason)48 static void grpc_binder_stream_ref(grpc_binder_stream* s, const char* reason) {
49 grpc_stream_ref(s->refcount, reason);
50 }
grpc_binder_stream_unref(grpc_binder_stream * s,const char * reason)51 static void grpc_binder_stream_unref(grpc_binder_stream* s,
52 const char* reason) {
53 grpc_stream_unref(s->refcount, reason);
54 }
grpc_binder_ref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)55 static void grpc_binder_ref_transport(grpc_binder_transport* t,
56 const char* reason, const char* file,
57 int line) {
58 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
59 }
grpc_binder_unref_transport(grpc_binder_transport * t,const char * reason,const char * file,int line)60 static void grpc_binder_unref_transport(grpc_binder_transport* t,
61 const char* reason, const char* file,
62 int line) {
63 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
64 delete t;
65 }
66 }
67 #else
grpc_binder_stream_ref(grpc_binder_stream * s)68 static void grpc_binder_stream_ref(grpc_binder_stream* s) {
69 grpc_stream_ref(s->refcount);
70 }
grpc_binder_stream_unref(grpc_binder_stream * s)71 static void grpc_binder_stream_unref(grpc_binder_stream* s) {
72 grpc_stream_unref(s->refcount);
73 }
grpc_binder_ref_transport(grpc_binder_transport * t)74 static void grpc_binder_ref_transport(grpc_binder_transport* t) {
75 t->refs.Ref();
76 }
grpc_binder_unref_transport(grpc_binder_transport * t)77 static void grpc_binder_unref_transport(grpc_binder_transport* t) {
78 if (t->refs.Unref()) {
79 delete t;
80 }
81 }
82 #endif
83
84 #ifndef NDEBUG
85 #define GRPC_BINDER_STREAM_REF(stream, reason) \
86 grpc_binder_stream_ref(stream, reason)
87 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
88 grpc_binder_stream_unref(stream, reason)
89 #define GRPC_BINDER_REF_TRANSPORT(t, r) \
90 grpc_binder_ref_transport(t, r, __FILE__, __LINE__)
91 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) \
92 grpc_binder_unref_transport(t, r, __FILE__, __LINE__)
93 #else
94 #define GRPC_BINDER_STREAM_REF(stream, reason) grpc_binder_stream_ref(stream)
95 #define GRPC_BINDER_STREAM_UNREF(stream, reason) \
96 grpc_binder_stream_unref(stream)
97 #define GRPC_BINDER_REF_TRANSPORT(t, r) grpc_binder_ref_transport(t)
98 #define GRPC_BINDER_UNREF_TRANSPORT(t, r) grpc_binder_unref_transport(t)
99 #endif
100
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)101 static int init_stream(grpc_transport* gt, grpc_stream* gs,
102 grpc_stream_refcount* refcount, const void* server_data,
103 grpc_core::Arena* arena) {
104 GPR_TIMER_SCOPE("init_stream", 0);
105 gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
106 server_data, arena);
107 grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
108 // TODO(mingcl): Figure out if we need to worry about concurrent invocation
109 // here
110 new (gs) grpc_binder_stream(t, refcount, server_data, arena,
111 t->NewStreamTxCode(), t->is_client);
112 return 0;
113 }
114
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * gp)115 static void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* gp) {
116 gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
117 }
118
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)119 static void set_pollset_set(grpc_transport*, grpc_stream*, grpc_pollset_set*) {
120 gpr_log(GPR_INFO, __func__);
121 }
122
AssignMetadata(grpc_metadata_batch * mb,const grpc_binder::Metadata & md)123 static void AssignMetadata(grpc_metadata_batch* mb,
124 const grpc_binder::Metadata& md) {
125 mb->Clear();
126 for (auto& p : md) {
127 mb->Append(p.first, grpc_slice_from_cpp_string(p.second));
128 }
129 }
130
cancel_stream_locked(grpc_binder_transport * gbt,grpc_binder_stream * gbs,grpc_error_handle error)131 static void cancel_stream_locked(grpc_binder_transport* gbt,
132 grpc_binder_stream* gbs,
133 grpc_error_handle error) {
134 gpr_log(GPR_INFO, "cancel_stream_locked");
135 if (!gbs->is_closed) {
136 GPR_ASSERT(gbs->cancel_self_error == GRPC_ERROR_NONE);
137 gbs->is_closed = true;
138 gbs->cancel_self_error = GRPC_ERROR_REF(error);
139 gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
140 gbt->registered_stream.erase(gbs->tx_code);
141 if (gbs->recv_initial_metadata_ready != nullptr) {
142 grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
143 GRPC_ERROR_REF(error));
144 gbs->recv_initial_metadata_ready = nullptr;
145 gbs->recv_initial_metadata = nullptr;
146 gbs->trailing_metadata_available = nullptr;
147 }
148 if (gbs->recv_message_ready != nullptr) {
149 grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
150 GRPC_ERROR_REF(error));
151 gbs->recv_message_ready = nullptr;
152 gbs->recv_message->reset();
153 gbs->recv_message = nullptr;
154 gbs->call_failed_before_recv_message = nullptr;
155 }
156 if (gbs->recv_trailing_metadata_finished != nullptr) {
157 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
158 gbs->recv_trailing_metadata_finished,
159 GRPC_ERROR_REF(error));
160 gbs->recv_trailing_metadata_finished = nullptr;
161 gbs->recv_trailing_metadata = nullptr;
162 }
163 }
164 GRPC_ERROR_UNREF(error);
165 }
166
ContainsAuthorityAndPath(const grpc_binder::Metadata & metadata)167 static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) {
168 bool has_authority = false;
169 bool has_path = false;
170 for (const auto& kv : metadata) {
171 if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_AUTHORITY)) {
172 has_authority = true;
173 }
174 if (kv.first == grpc_core::StringViewFromSlice(GRPC_MDSTR_PATH)) {
175 has_path = true;
176 }
177 }
178 return has_authority && has_path;
179 }
180
recv_initial_metadata_locked(void * arg,grpc_error_handle)181 static void recv_initial_metadata_locked(void* arg,
182 grpc_error_handle /*error*/) {
183 RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg);
184 grpc_binder_stream* gbs = args->gbs;
185
186 gpr_log(GPR_INFO,
187 "recv_initial_metadata_locked is_client = %d is_closed = %d",
188 gbs->is_client, gbs->is_closed);
189
190 if (!gbs->is_closed) {
191 grpc_error_handle error = [&] {
192 GPR_ASSERT(gbs->recv_initial_metadata);
193 GPR_ASSERT(gbs->recv_initial_metadata_ready);
194 if (!args->initial_metadata.ok()) {
195 gpr_log(GPR_ERROR, "Failed to parse initial metadata");
196 return absl_status_to_grpc_error(args->initial_metadata.status());
197 }
198 if (!gbs->is_client) {
199 // For server, we expect :authority and :path in initial metadata.
200 if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
201 return GRPC_ERROR_CREATE_FROM_CPP_STRING(
202 "Missing :authority or :path in initial metadata");
203 }
204 }
205 AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
206 return GRPC_ERROR_NONE;
207 }();
208
209 grpc_closure* cb = gbs->recv_initial_metadata_ready;
210 gbs->recv_initial_metadata_ready = nullptr;
211 gbs->recv_initial_metadata = nullptr;
212 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
213 }
214 GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
215 }
216
recv_message_locked(void * arg,grpc_error_handle)217 static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
218 RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
219 grpc_binder_stream* gbs = args->gbs;
220
221 gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
222 gbs->is_client, gbs->is_closed);
223
224 if (!gbs->is_closed) {
225 grpc_error_handle error = [&] {
226 GPR_ASSERT(gbs->recv_message);
227 GPR_ASSERT(gbs->recv_message_ready);
228 if (!args->message.ok()) {
229 gpr_log(GPR_ERROR, "Failed to receive message");
230 if (args->message.status().message() ==
231 grpc_binder::TransportStreamReceiver::
232 kGrpcBinderTransportCancelledGracefully) {
233 gpr_log(GPR_ERROR, "message cancelled gracefully");
234 // Cancelled because we've already received trailing metadata.
235 // It's not an error in this case.
236 return GRPC_ERROR_NONE;
237 } else {
238 return absl_status_to_grpc_error(args->message.status());
239 }
240 }
241 grpc_slice_buffer buf;
242 grpc_slice_buffer_init(&buf);
243 grpc_slice_buffer_add(&buf, grpc_slice_from_cpp_string(*args->message));
244
245 gbs->sbs.Init(&buf, 0);
246 gbs->recv_message->reset(gbs->sbs.get());
247 return GRPC_ERROR_NONE;
248 }();
249
250 if (error != GRPC_ERROR_NONE &&
251 gbs->call_failed_before_recv_message != nullptr) {
252 *gbs->call_failed_before_recv_message = true;
253 }
254 grpc_closure* cb = gbs->recv_message_ready;
255 gbs->recv_message_ready = nullptr;
256 gbs->recv_message = nullptr;
257 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
258 }
259
260 GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
261 }
262
recv_trailing_metadata_locked(void * arg,grpc_error_handle)263 static void recv_trailing_metadata_locked(void* arg,
264 grpc_error_handle /*error*/) {
265 RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg);
266 grpc_binder_stream* gbs = args->gbs;
267
268 gpr_log(GPR_INFO,
269 "recv_trailing_metadata_locked is_client = %d is_closed = %d",
270 gbs->is_client, gbs->is_closed);
271
272 if (!gbs->is_closed) {
273 grpc_error_handle error = [&] {
274 GPR_ASSERT(gbs->recv_trailing_metadata);
275 GPR_ASSERT(gbs->recv_trailing_metadata_finished);
276 if (!args->trailing_metadata.ok()) {
277 gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
278 return absl_status_to_grpc_error(args->trailing_metadata.status());
279 }
280 if (!gbs->is_client) {
281 // Client will not send non-empty trailing metadata.
282 if (!args->trailing_metadata.value().empty()) {
283 gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
284 return GRPC_ERROR_CANCELLED;
285 }
286 } else {
287 AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
288 // Append status to metadata
289 // TODO(b/192208695): See if we can avoid to manually put status
290 // code into the header
291 gpr_log(GPR_INFO, "status = %d", args->status);
292 grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
293 gbs->arena->Alloc(sizeof(grpc_linked_mdelem)));
294 glm->md = grpc_get_reffed_status_elem(args->status);
295 GPR_ASSERT(gbs->recv_trailing_metadata->LinkTail(glm) ==
296 GRPC_ERROR_NONE);
297 gpr_log(GPR_INFO, "trailing_metadata = %p",
298 gbs->recv_trailing_metadata);
299 gpr_log(GPR_INFO, "glm = %p", glm);
300 }
301 return GRPC_ERROR_NONE;
302 }();
303
304 if (gbs->is_client || gbs->trailing_metadata_sent) {
305 grpc_closure* cb = gbs->recv_trailing_metadata_finished;
306 gbs->recv_trailing_metadata_finished = nullptr;
307 gbs->recv_trailing_metadata = nullptr;
308 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
309 } else {
310 // According to transport explaineer - "Server extra: This op shouldn't
311 // actually be considered complete until the server has also sent trailing
312 // metadata to provide the other side with final status"
313 //
314 // We haven't sent trailing metadata yet, so we have to delay completing
315 // the recv_trailing_metadata callback.
316 gbs->need_to_call_trailing_metadata_callback = true;
317 }
318 }
319 GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
320 }
321
perform_stream_op_locked(void * stream_op,grpc_error_handle)322 static void perform_stream_op_locked(void* stream_op,
323 grpc_error_handle /*error*/) {
324 grpc_transport_stream_op_batch* op =
325 static_cast<grpc_transport_stream_op_batch*>(stream_op);
326 grpc_binder_stream* gbs =
327 static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
328 grpc_binder_transport* gbt = gbs->t;
329 if (op->cancel_stream) {
330 // TODO(waynetu): Is this true?
331 GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
332 !op->send_trailing_metadata && !op->recv_initial_metadata &&
333 !op->recv_message && !op->recv_trailing_metadata);
334 gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
335 if (!gbs->is_client) {
336 // Send trailing metadata to inform the other end about the cancellation,
337 // regardless if we'd already done that or not.
338 grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbt->is_client);
339 cancel_tx.SetSuffix(grpc_binder::Metadata{});
340 cancel_tx.SetStatus(1);
341 absl::Status status = gbt->wire_writer->RpcCall(cancel_tx);
342 }
343 cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
344 if (op->on_complete != nullptr) {
345 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, GRPC_ERROR_NONE);
346 }
347 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
348 return;
349 }
350
351 if (gbs->is_closed) {
352 if (op->send_message) {
353 // Reset the send_message payload to prevent memory leaks.
354 op->payload->send_message.send_message.reset();
355 }
356 if (op->recv_initial_metadata) {
357 grpc_core::ExecCtx::Run(
358 DEBUG_LOCATION,
359 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
360 GRPC_ERROR_REF(gbs->cancel_self_error));
361 }
362 if (op->recv_message) {
363 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
364 op->payload->recv_message.recv_message_ready,
365 GRPC_ERROR_REF(gbs->cancel_self_error));
366 }
367 if (op->recv_trailing_metadata) {
368 grpc_core::ExecCtx::Run(
369 DEBUG_LOCATION,
370 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
371 GRPC_ERROR_REF(gbs->cancel_self_error));
372 }
373 if (op->on_complete != nullptr) {
374 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
375 GRPC_ERROR_REF(gbs->cancel_self_error));
376 }
377 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
378 return;
379 }
380
381 int tx_code = gbs->tx_code;
382 grpc_binder::Transaction tx(tx_code, gbt->is_client);
383
384 if (op->send_initial_metadata) {
385 gpr_log(GPR_INFO, "send_initial_metadata");
386 grpc_binder::Metadata init_md;
387 auto batch = op->payload->send_initial_metadata.send_initial_metadata;
388
389 batch->ForEach([&](grpc_mdelem md) {
390 absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
391 absl::string_view value =
392 grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
393 gpr_log(GPR_INFO, "send initial metatday key-value %s",
394 absl::StrCat(key, " ", value).c_str());
395 if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
396 // TODO(b/192208403): Figure out if it is correct to simply drop '/'
397 // prefix and treat it as rpc method name
398 GPR_ASSERT(value[0] == '/');
399 std::string path = std::string(value).substr(1);
400
401 // Only client send method ref.
402 GPR_ASSERT(gbt->is_client);
403 tx.SetMethodRef(path);
404 } else {
405 init_md.emplace_back(std::string(key), std::string(value));
406 }
407 });
408 tx.SetPrefix(init_md);
409 }
410 if (op->send_message) {
411 gpr_log(GPR_INFO, "send_message");
412 size_t remaining = op->payload->send_message.send_message->length();
413 std::string message_data;
414 while (remaining > 0) {
415 grpc_slice message_slice;
416 // TODO(waynetu): Temporarily assume that the message is ready.
417 GPR_ASSERT(
418 op->payload->send_message.send_message->Next(SIZE_MAX, nullptr));
419 grpc_error_handle error =
420 op->payload->send_message.send_message->Pull(&message_slice);
421 // TODO(waynetu): Cancel the stream if error is not GRPC_ERROR_NONE.
422 GPR_ASSERT(error == GRPC_ERROR_NONE);
423 uint8_t* p = GRPC_SLICE_START_PTR(message_slice);
424 size_t len = GRPC_SLICE_LENGTH(message_slice);
425 remaining -= len;
426 message_data += std::string(reinterpret_cast<char*>(p), len);
427 grpc_slice_unref_internal(message_slice);
428 }
429 gpr_log(GPR_INFO, "message_data = %s", message_data.c_str());
430 tx.SetData(message_data);
431 // TODO(b/192369787): Are we supposed to reset here to avoid
432 // use-after-free issue in call.cc?
433 op->payload->send_message.send_message.reset();
434 }
435
436 if (op->send_trailing_metadata) {
437 gpr_log(GPR_INFO, "send_trailing_metadata");
438 auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
439 grpc_binder::Metadata trailing_metadata;
440
441 batch->ForEach([&](grpc_mdelem md) {
442 // Client will not send trailing metadata.
443 GPR_ASSERT(!gbt->is_client);
444
445 if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS)) {
446 int status = grpc_get_status_code_from_metadata(md);
447 gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
448 tx.SetStatus(status);
449 } else {
450 absl::string_view key = grpc_core::StringViewFromSlice(GRPC_MDKEY(md));
451 absl::string_view value =
452 grpc_core::StringViewFromSlice(GRPC_MDVALUE(md));
453 gpr_log(GPR_INFO, "send trailing metatday key-value %s",
454 absl::StrCat(key, " ", value).c_str());
455 trailing_metadata.emplace_back(std::string(key), std::string(value));
456 }
457 });
458 // TODO(mingcl): Will we ever has key-value pair here? According to
459 // wireformat client suffix data is always empty.
460 tx.SetSuffix(trailing_metadata);
461 }
462 if (op->recv_initial_metadata) {
463 gpr_log(GPR_INFO, "recv_initial_metadata");
464 gbs->recv_initial_metadata_ready =
465 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
466 gbs->recv_initial_metadata =
467 op->payload->recv_initial_metadata.recv_initial_metadata;
468 gbs->trailing_metadata_available =
469 op->payload->recv_initial_metadata.trailing_metadata_available;
470 GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
471 gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
472 tx_code, [tx_code, gbs,
473 gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
474 grpc_core::ExecCtx exec_ctx;
475 gbs->recv_initial_metadata_args.tx_code = tx_code;
476 gbs->recv_initial_metadata_args.initial_metadata =
477 std::move(initial_metadata);
478 gbt->combiner->Run(
479 GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure,
480 recv_initial_metadata_locked,
481 &gbs->recv_initial_metadata_args, nullptr),
482 GRPC_ERROR_NONE);
483 });
484 }
485 if (op->recv_message) {
486 gpr_log(GPR_INFO, "recv_message");
487 gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
488 gbs->recv_message = op->payload->recv_message.recv_message;
489 gbs->call_failed_before_recv_message =
490 op->payload->recv_message.call_failed_before_recv_message;
491 GRPC_BINDER_STREAM_REF(gbs, "recv_message");
492 gbt->transport_stream_receiver->RegisterRecvMessage(
493 tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
494 grpc_core::ExecCtx exec_ctx;
495 gbs->recv_message_args.tx_code = tx_code;
496 gbs->recv_message_args.message = std::move(message);
497 gbt->combiner->Run(
498 GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked,
499 &gbs->recv_message_args, nullptr),
500 GRPC_ERROR_NONE);
501 });
502 }
503 if (op->recv_trailing_metadata) {
504 gpr_log(GPR_INFO, "recv_trailing_metadata");
505 gbs->recv_trailing_metadata_finished =
506 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
507 gbs->recv_trailing_metadata =
508 op->payload->recv_trailing_metadata.recv_trailing_metadata;
509 GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
510 gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
511 tx_code, [tx_code, gbs, gbt](
512 absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
513 int status) {
514 grpc_core::ExecCtx exec_ctx;
515 gbs->recv_trailing_metadata_args.tx_code = tx_code;
516 gbs->recv_trailing_metadata_args.trailing_metadata =
517 std::move(trailing_metadata);
518 gbs->recv_trailing_metadata_args.status = status;
519 gbt->combiner->Run(
520 GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure,
521 recv_trailing_metadata_locked,
522 &gbs->recv_trailing_metadata_args, nullptr),
523 GRPC_ERROR_NONE);
524 });
525 }
526 // Only send transaction when there's a send op presented.
527 absl::Status status = absl::OkStatus();
528 if (op->send_initial_metadata || op->send_message ||
529 op->send_trailing_metadata) {
530 // TODO(waynetu): RpcCall() is doing a lot of work (including waiting for
531 // acknowledgements from the other side). Consider delaying this operation
532 // with combiner.
533 status = gbt->wire_writer->RpcCall(tx);
534 if (!gbs->is_client && op->send_trailing_metadata) {
535 gbs->trailing_metadata_sent = true;
536 // According to transport explaineer - "Server extra: This op shouldn't
537 // actually be considered complete until the server has also sent trailing
538 // metadata to provide the other side with final status"
539 //
540 // Because we've done sending trailing metadata here, we can safely
541 // complete the recv_trailing_metadata callback here.
542 if (gbs->need_to_call_trailing_metadata_callback) {
543 grpc_closure* cb = gbs->recv_trailing_metadata_finished;
544 gbs->recv_trailing_metadata_finished = nullptr;
545 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
546 gbs->need_to_call_trailing_metadata_callback = false;
547 }
548 }
549 }
550 // Note that this should only be scheduled when all non-recv ops are
551 // completed
552 if (op->on_complete != nullptr) {
553 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
554 absl_status_to_grpc_error(status));
555 gpr_log(GPR_INFO, "on_complete closure schuduled");
556 }
557 GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
558 }
559
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)560 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
561 grpc_transport_stream_op_batch* op) {
562 GPR_TIMER_SCOPE("perform_stream_op", 0);
563 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
564 grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
565 gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, gt, gs, op,
566 gbs->is_client);
567 GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
568 op->handler_private.extra_arg = gbs;
569 gbt->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
570 perform_stream_op_locked, op, nullptr),
571 GRPC_ERROR_NONE);
572 }
573
close_transport_locked(grpc_binder_transport * gbt)574 static void close_transport_locked(grpc_binder_transport* gbt) {
575 gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
576 "transport closed due to disconnection/goaway");
577 while (!gbt->registered_stream.empty()) {
578 cancel_stream_locked(
579 gbt, gbt->registered_stream.begin()->second,
580 grpc_error_set_int(
581 GRPC_ERROR_CREATE_FROM_STATIC_STRING("transport closed"),
582 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
583 }
584 }
585
perform_transport_op_locked(void * transport_op,grpc_error_handle)586 static void perform_transport_op_locked(void* transport_op,
587 grpc_error_handle /*error*/) {
588 grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
589 grpc_binder_transport* gbt =
590 static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
591 // TODO(waynetu): Should we lock here to avoid data race?
592 if (op->start_connectivity_watch != nullptr) {
593 gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
594 std::move(op->start_connectivity_watch));
595 }
596 if (op->stop_connectivity_watch != nullptr) {
597 gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
598 }
599 if (op->set_accept_stream) {
600 gbt->accept_stream_fn = op->set_accept_stream_fn;
601 gbt->accept_stream_user_data = op->set_accept_stream_user_data;
602 }
603 if (op->on_consumed) {
604 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
605 }
606 bool do_close = false;
607 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
608 do_close = true;
609 GRPC_ERROR_UNREF(op->disconnect_with_error);
610 }
611 if (op->goaway_error != GRPC_ERROR_NONE) {
612 do_close = true;
613 GRPC_ERROR_UNREF(op->goaway_error);
614 }
615 if (do_close) {
616 close_transport_locked(gbt);
617 }
618 GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
619 }
620
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)621 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
622 gpr_log(GPR_INFO, __func__);
623 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
624 op->handler_private.extra_arg = gbt;
625 GRPC_BINDER_REF_TRANSPORT(gbt, "perform_transport_op");
626 gbt->combiner->Run(
627 GRPC_CLOSURE_INIT(&op->handler_private.closure,
628 perform_transport_op_locked, op, nullptr),
629 GRPC_ERROR_NONE);
630 }
631
destroy_stream_locked(void * sp,grpc_error_handle)632 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
633 grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
634 grpc_binder_transport* gbt = gbs->t;
635 cancel_stream_locked(
636 gbt, gbs,
637 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("destroy stream"),
638 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
639 gbs->~grpc_binder_stream();
640 }
641
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)642 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
643 grpc_closure* then_schedule_closure) {
644 gpr_log(GPR_INFO, __func__);
645 grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
646 gbs->destroy_stream_then_closure = then_schedule_closure;
647 gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream,
648 destroy_stream_locked, gbs, nullptr),
649 GRPC_ERROR_NONE);
650 }
651
destroy_transport_locked(void * gt,grpc_error_handle)652 static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
653 grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
654 close_transport_locked(gbt);
655 // Release the references held by the transport.
656 gbt->wire_reader = nullptr;
657 gbt->transport_stream_receiver = nullptr;
658 gbt->wire_writer = nullptr;
659 GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
660 }
661
destroy_transport(grpc_transport * gt)662 static void destroy_transport(grpc_transport* gt) {
663 gpr_log(GPR_INFO, __func__);
664 grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
665 gbt->combiner->Run(
666 GRPC_CLOSURE_CREATE(destroy_transport_locked, gbt, nullptr),
667 GRPC_ERROR_NONE);
668 }
669
get_endpoint(grpc_transport *)670 static grpc_endpoint* get_endpoint(grpc_transport*) {
671 gpr_log(GPR_INFO, __func__);
672 return nullptr;
673 }
674
675 // See grpc_transport_vtable declaration for meaning of each field
676 static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
677 "binder",
678 init_stream,
679 set_pollset,
680 set_pollset_set,
681 perform_stream_op,
682 perform_transport_op,
683 destroy_stream,
684 destroy_transport,
685 get_endpoint};
686
get_vtable()687 static const grpc_transport_vtable* get_vtable() { return &vtable; }
688
accept_stream_locked(void * gt,grpc_error_handle)689 static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
690 grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
691 if (gbt->accept_stream_fn) {
692 // must pass in a non-null value.
693 (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, &gbt->base, gbt);
694 }
695 }
696
grpc_binder_transport(std::unique_ptr<grpc_binder::Binder> binder,bool is_client,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)697 grpc_binder_transport::grpc_binder_transport(
698 std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
699 std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
700 : is_client(is_client),
701 combiner(grpc_combiner_create()),
702 state_tracker(is_client ? "binder_transport_client"
703 : "binder_transport_server"),
704 refs(1, nullptr) {
705 gpr_log(GPR_INFO, __func__);
706 base.vtable = get_vtable();
707 GRPC_CLOSURE_INIT(&accept_stream_closure, accept_stream_locked, this,
708 nullptr);
709 transport_stream_receiver =
710 std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
711 is_client, /*accept_stream_callback=*/[this] {
712 grpc_core::ExecCtx exec_ctx;
713 combiner->Run(&accept_stream_closure, GRPC_ERROR_NONE);
714 });
715 // WireReader holds a ref to grpc_binder_transport.
716 GRPC_BINDER_REF_TRANSPORT(this, "wire reader");
717 wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
718 transport_stream_receiver, is_client, security_policy,
719 /*on_destruct_callback=*/
720 [this] {
721 // Unref transport when destructed.
722 GRPC_BINDER_UNREF_TRANSPORT(this, "wire reader");
723 });
724 wire_writer = wire_reader->SetupTransport(std::move(binder));
725 }
726
~grpc_binder_transport()727 grpc_binder_transport::~grpc_binder_transport() {
728 GRPC_COMBINER_UNREF(combiner, "binder_transport");
729 }
730
grpc_create_binder_transport_client(std::unique_ptr<grpc_binder::Binder> endpoint_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)731 grpc_transport* grpc_create_binder_transport_client(
732 std::unique_ptr<grpc_binder::Binder> endpoint_binder,
733 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
734 security_policy) {
735 gpr_log(GPR_INFO, __func__);
736
737 GPR_ASSERT(endpoint_binder != nullptr);
738 GPR_ASSERT(security_policy != nullptr);
739
740 grpc_binder_transport* t = new grpc_binder_transport(
741 std::move(endpoint_binder), /*is_client=*/true, security_policy);
742
743 return &t->base;
744 }
745
grpc_create_binder_transport_server(std::unique_ptr<grpc_binder::Binder> client_binder,std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)746 grpc_transport* grpc_create_binder_transport_server(
747 std::unique_ptr<grpc_binder::Binder> client_binder,
748 std::shared_ptr<grpc::experimental::binder::SecurityPolicy>
749 security_policy) {
750 gpr_log(GPR_INFO, __func__);
751
752 GPR_ASSERT(client_binder != nullptr);
753 GPR_ASSERT(security_policy != nullptr);
754
755 grpc_binder_transport* t = new grpc_binder_transport(
756 std::move(client_binder), /*is_client=*/false, security_policy);
757
758 return &t->base;
759 }
760 #endif
761