1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/transport.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29
30 #include "src/core/lib/gpr/alloc.h"
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/iomgr/executor.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/core/lib/slice/slice_internal.h"
36 #include "src/core/lib/slice/slice_string_helpers.h"
37 #include "src/core/lib/transport/transport_impl.h"
38
39 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
40 "stream_refcount");
41
grpc_stream_destroy(grpc_stream_refcount * refcount)42 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
43 if (!grpc_iomgr_is_any_background_poller_thread() &&
44 (grpc_core::ExecCtx::Get()->flags() &
45 GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
46 /* Ick.
47 The thread we're running on MAY be owned (indirectly) by a call-stack.
48 If that's the case, destroying the call-stack MAY try to destroy the
49 thread, which is a tangled mess that we just don't want to ever have to
50 cope with.
51 Throw this over to the executor (on a core-owned thread) and process it
52 there. */
53 refcount->destroy.scheduler =
54 grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
55 }
56 GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
57 }
58
slice_stream_destroy(void * arg)59 void slice_stream_destroy(void* arg) {
60 grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
61 }
62
63 #define STREAM_REF_FROM_SLICE_REF(p) \
64 ((grpc_stream_refcount*)(((uint8_t*)p) - \
65 offsetof(grpc_stream_refcount, slice_refcount)))
66
grpc_slice_from_stream_owned_buffer(grpc_stream_refcount * refcount,void * buffer,size_t length)67 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
68 void* buffer, size_t length) {
69 #ifndef NDEBUG
70 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(&refcount->slice_refcount),
71 "slice");
72 #else
73 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(&refcount->slice_refcount));
74 #endif
75 grpc_slice res;
76 res.refcount = &refcount->slice_refcount;
77 res.data.refcounted.bytes = static_cast<uint8_t*>(buffer);
78 res.data.refcounted.length = length;
79 return res;
80 }
81
82 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int initial_refs,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)83 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
84 grpc_iomgr_cb_func cb, void* cb_arg,
85 const char* object_type) {
86 refcount->object_type = object_type;
87 #else
88 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
89 grpc_iomgr_cb_func cb, void* cb_arg) {
90 #endif
91 GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
92
93 new (&refcount->refs) grpc_core::RefCount(1, &grpc_trace_stream_refcount);
94 new (&refcount->slice_refcount) grpc_slice_refcount(
95 grpc_slice_refcount::Type::REGULAR, &refcount->refs, slice_stream_destroy,
96 refcount, &refcount->slice_refcount);
97 }
98
99 static void move64(uint64_t* from, uint64_t* to) {
100 *to += *from;
101 *from = 0;
102 }
103
104 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
105 grpc_transport_one_way_stats* to) {
106 move64(&from->framing_bytes, &to->framing_bytes);
107 move64(&from->data_bytes, &to->data_bytes);
108 move64(&from->header_bytes, &to->header_bytes);
109 }
110
111 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
112 grpc_transport_stream_stats* to) {
113 grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
114 grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
115 }
116
117 size_t grpc_transport_stream_size(grpc_transport* transport) {
118 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream);
119 }
120
121 void grpc_transport_destroy(grpc_transport* transport) {
122 transport->vtable->destroy(transport);
123 }
124
125 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
126 grpc_stream_refcount* refcount,
127 const void* server_data,
128 grpc_core::Arena* arena) {
129 return transport->vtable->init_stream(transport, stream, refcount,
130 server_data, arena);
131 }
132
133 void grpc_transport_perform_stream_op(grpc_transport* transport,
134 grpc_stream* stream,
135 grpc_transport_stream_op_batch* op) {
136 transport->vtable->perform_stream_op(transport, stream, op);
137 }
138
139 void grpc_transport_perform_op(grpc_transport* transport,
140 grpc_transport_op* op) {
141 transport->vtable->perform_op(transport, op);
142 }
143
144 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
145 grpc_polling_entity* pollent) {
146 grpc_pollset* pollset;
147 grpc_pollset_set* pollset_set;
148 if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
149 transport->vtable->set_pollset(transport, stream, pollset);
150 } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
151 nullptr) {
152 transport->vtable->set_pollset_set(transport, stream, pollset_set);
153 } else {
154 // No-op for empty pollset. Empty pollset is possible when using
155 // non-fd-based event engines such as CFStream.
156 }
157 }
158
159 void grpc_transport_destroy_stream(grpc_transport* transport,
160 grpc_stream* stream,
161 grpc_closure* then_schedule_closure) {
162 transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
163 }
164
165 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
166 return transport->vtable->get_endpoint(transport);
167 }
168
169 // This comment should be sung to the tune of
170 // "Supercalifragilisticexpialidocious":
171 //
172 // grpc_transport_stream_op_batch_finish_with_failure
173 // is a function that must always unref cancel_error
174 // though it lives in lib, it handles transport stream ops sure
175 // it's grpc_transport_stream_op_batch_finish_with_failure
176 void grpc_transport_stream_op_batch_finish_with_failure(
177 grpc_transport_stream_op_batch* batch, grpc_error* error,
178 grpc_core::CallCombiner* call_combiner) {
179 if (batch->send_message) {
180 batch->payload->send_message.send_message.reset();
181 }
182 if (batch->cancel_stream) {
183 GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
184 }
185 // Construct a list of closures to execute.
186 grpc_core::CallCombinerClosureList closures;
187 if (batch->recv_initial_metadata) {
188 closures.Add(
189 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
190 GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
191 }
192 if (batch->recv_message) {
193 closures.Add(batch->payload->recv_message.recv_message_ready,
194 GRPC_ERROR_REF(error), "failing recv_message_ready");
195 }
196 if (batch->recv_trailing_metadata) {
197 closures.Add(
198 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
199 GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
200 }
201 if (batch->on_complete != nullptr) {
202 closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
203 "failing on_complete");
204 }
205 // Execute closures.
206 closures.RunClosures(call_combiner);
207 GRPC_ERROR_UNREF(error);
208 }
209
210 struct made_transport_op {
211 grpc_closure outer_on_complete;
212 grpc_closure* inner_on_complete = nullptr;
213 grpc_transport_op op;
214 made_transport_op() {
215 memset(&outer_on_complete, 0, sizeof(outer_on_complete));
216 }
217 };
218
219 static void destroy_made_transport_op(void* arg, grpc_error* error) {
220 made_transport_op* op = static_cast<made_transport_op*>(arg);
221 GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
222 grpc_core::Delete<made_transport_op>(op);
223 }
224
225 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
226 made_transport_op* op = grpc_core::New<made_transport_op>();
227 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
228 grpc_schedule_on_exec_ctx);
229 op->inner_on_complete = on_complete;
230 op->op.on_consumed = &op->outer_on_complete;
231 return &op->op;
232 }
233
234 typedef struct {
235 grpc_closure outer_on_complete;
236 grpc_closure* inner_on_complete;
237 grpc_transport_stream_op_batch op;
238 grpc_transport_stream_op_batch_payload payload;
239 } made_transport_stream_op;
240
241 static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
242 made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
243 grpc_closure* c = op->inner_on_complete;
244 gpr_free(op);
245 GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error));
246 }
247
248 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
249 grpc_closure* on_complete) {
250 made_transport_stream_op* op =
251 static_cast<made_transport_stream_op*>(gpr_zalloc(sizeof(*op)));
252 op->op.payload = &op->payload;
253 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
254 op, grpc_schedule_on_exec_ctx);
255 op->inner_on_complete = on_complete;
256 op->op.on_complete = &op->outer_on_complete;
257 return &op->op;
258 }
259