1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GRPC_USE_EVENT_ENGINE
17 #include "absl/strings/string_view.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice.h>
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/time.h>
23
24 #include "src/core/lib/address_utils/sockaddr_utils.h"
25 #include "src/core/lib/channel/channel_args.h"
26 #include "src/core/lib/iomgr/endpoint.h"
27 #include "src/core/lib/iomgr/error.h"
28 #include "src/core/lib/iomgr/event_engine/closure.h"
29 #include "src/core/lib/iomgr/event_engine/endpoint.h"
30 #include "src/core/lib/iomgr/event_engine/pollset.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_set.h"
33 #include "src/core/lib/iomgr/resource_quota.h"
34 #include "src/core/lib/transport/error_utils.h"
35
36 extern grpc_core::TraceFlag grpc_tcp_trace;
37
38 namespace {
39
40 using ::grpc_event_engine::experimental::EventEngine;
41 using ::grpc_event_engine::experimental::ResolvedAddressToURI;
42 using ::grpc_event_engine::experimental::SliceAllocator;
43 using ::grpc_event_engine::experimental::SliceBuffer;
44
endpoint_read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool)45 void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
46 grpc_closure* cb, bool /* urgent */) {
47 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
48 if (eeep->endpoint == nullptr) {
49 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_CANCELLED);
50 return;
51 }
52 SliceBuffer* read_buffer = new (&eeep->read_buffer) SliceBuffer(slices);
53 eeep->endpoint->Read(
54 [eeep, cb](absl::Status status) {
55 auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep->read_buffer);
56 read_buffer->~SliceBuffer();
57 grpc_core::ExecCtx exec_ctx;
58 grpc_core::Closure::Run(DEBUG_LOCATION, cb,
59 absl_status_to_grpc_error(status));
60 exec_ctx.Flush();
61 grpc_pollset_ee_broadcast_event();
62 },
63 read_buffer);
64 }
65
endpoint_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)66 void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
67 grpc_closure* cb, void* arg) {
68 // TODO(hork): adapt arg to some metrics collection mechanism.
69 (void)arg;
70 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
71 if (eeep->endpoint == nullptr) {
72 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_CANCELLED);
73 return;
74 }
75 SliceBuffer* write_buffer = new (&eeep->write_buffer) SliceBuffer(slices);
76 eeep->endpoint->Write(
77 [eeep, cb](absl::Status status) {
78 auto* write_buffer =
79 reinterpret_cast<SliceBuffer*>(&eeep->write_buffer);
80 write_buffer->~SliceBuffer();
81 grpc_core::ExecCtx exec_ctx;
82 grpc_core::Closure::Run(DEBUG_LOCATION, cb,
83 absl_status_to_grpc_error(status));
84 exec_ctx.Flush();
85 grpc_pollset_ee_broadcast_event();
86 },
87 write_buffer);
88 }
endpoint_add_to_pollset(grpc_endpoint *,grpc_pollset *)89 void endpoint_add_to_pollset(grpc_endpoint* /* ep */,
90 grpc_pollset* /* pollset */) {}
endpoint_add_to_pollset_set(grpc_endpoint *,grpc_pollset_set *)91 void endpoint_add_to_pollset_set(grpc_endpoint* /* ep */,
92 grpc_pollset_set* /* pollset */) {}
endpoint_delete_from_pollset_set(grpc_endpoint *,grpc_pollset_set *)93 void endpoint_delete_from_pollset_set(grpc_endpoint* /* ep */,
94 grpc_pollset_set* /* pollset */) {}
95 /// After shutdown, all endpoint operations except destroy are no-op,
96 /// and will return some kind of sane default (empty strings, nullptrs, etc). It
97 /// is the caller's responsibility to ensure that calls to endpoint_shutdown are
98 /// synchronized.
endpoint_shutdown(grpc_endpoint * ep,grpc_error_handle why)99 void endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
100 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
101 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
102 std::string str = grpc_error_std_string(why);
103 gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->endpoint.get(),
104 str.c_str());
105 }
106 eeep->endpoint.reset();
107 }
108
endpoint_destroy(grpc_endpoint * ep)109 void endpoint_destroy(grpc_endpoint* ep) {
110 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
111 delete eeep;
112 }
113
endpoint_get_peer(grpc_endpoint * ep)114 absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
115 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
116 if (eeep->endpoint == nullptr) {
117 return "";
118 }
119 if (eeep->peer_address.empty()) {
120 const EventEngine::ResolvedAddress& addr = eeep->endpoint->GetPeerAddress();
121 eeep->peer_address = ResolvedAddressToURI(addr);
122 }
123 return eeep->peer_address;
124 }
125
endpoint_get_local_address(grpc_endpoint * ep)126 absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
127 auto* eeep = reinterpret_cast<grpc_event_engine_endpoint*>(ep);
128 if (eeep->endpoint == nullptr) {
129 return "";
130 }
131 if (eeep->local_address.empty()) {
132 const EventEngine::ResolvedAddress& addr =
133 eeep->endpoint->GetLocalAddress();
134 eeep->local_address = ResolvedAddressToURI(addr);
135 }
136 return eeep->local_address;
137 }
138
endpoint_get_fd(grpc_endpoint *)139 int endpoint_get_fd(grpc_endpoint* /* ep */) { return -1; }
140
endpoint_can_track_err(grpc_endpoint *)141 bool endpoint_can_track_err(grpc_endpoint* /* ep */) { return false; }
142
143 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
144 endpoint_read,
145 endpoint_write,
146 endpoint_add_to_pollset,
147 endpoint_add_to_pollset_set,
148 endpoint_delete_from_pollset_set,
149 endpoint_shutdown,
150 endpoint_destroy,
151 endpoint_get_peer,
152 endpoint_get_local_address,
153 endpoint_get_fd,
154 endpoint_can_track_err};
155
156 } // namespace
157
grpc_tcp_server_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)158 grpc_event_engine_endpoint* grpc_tcp_server_endpoint_create(
159 std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
160 auto endpoint = new grpc_event_engine_endpoint;
161 endpoint->base.vtable = &grpc_event_engine_endpoint_vtable;
162 endpoint->endpoint = std::move(ee_endpoint);
163 return endpoint;
164 }
165
grpc_tcp_create(const grpc_channel_args * channel_args,absl::string_view peer_address)166 grpc_endpoint* grpc_tcp_create(const grpc_channel_args* channel_args,
167 absl::string_view peer_address) {
168 auto endpoint = new grpc_event_engine_endpoint;
169 endpoint->base.vtable = &grpc_event_engine_endpoint_vtable;
170 return &endpoint->base;
171 }
172
173 #endif // GRPC_USE_EVENT_ENGINE
174