1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/channel/handshaker.h"
43 #include "src/core/lib/channel/handshaker_registry.h"
44 #include "src/core/lib/gprpp/ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/iomgr/endpoint.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/iomgr/resource_quota.h"
49 #include "src/core/lib/iomgr/tcp_server.h"
50 #include "src/core/lib/iomgr/unix_sockets_posix.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/api_trace.h"
53 #include "src/core/lib/surface/server.h"
54
55 namespace grpc_core {
56 namespace {
57
58 const char kUnixUriPrefix[] = "unix:";
59 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
60
61 class Chttp2ServerListener : public Server::ListenerInterface {
62 public:
63 static grpc_error* Create(Server* server, const char* addr,
64 grpc_channel_args* args, int* port_num);
65
66 static grpc_error* CreateWithAcceptor(Server* server, const char* name,
67 grpc_channel_args* args);
68
69 // Do not instantiate directly. Use one of the factory methods above.
70 Chttp2ServerListener(Server* server, grpc_channel_args* args);
71 ~Chttp2ServerListener() override;
72
73 void Start(Server* server,
74 const std::vector<grpc_pollset*>* pollsets) override;
75
channelz_listen_socket_node() const76 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
77 return channelz_listen_socket_.get();
78 }
79
80 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
81
82 void Orphan() override;
83
84 private:
85 class ConnectionState : public RefCounted<ConnectionState> {
86 public:
87 ConnectionState(Chttp2ServerListener* listener,
88 grpc_pollset* accepting_pollset,
89 grpc_tcp_server_acceptor* acceptor,
90 RefCountedPtr<HandshakeManager> handshake_mgr,
91 grpc_channel_args* args, grpc_endpoint* endpoint);
92
93 ~ConnectionState() override;
94
95 private:
96 static void OnTimeout(void* arg, grpc_error* error);
97 static void OnReceiveSettings(void* arg, grpc_error* error);
98 static void OnHandshakeDone(void* arg, grpc_error* error);
99
100 Chttp2ServerListener* const listener_;
101 grpc_pollset* const accepting_pollset_;
102 grpc_tcp_server_acceptor* const acceptor_;
103 RefCountedPtr<HandshakeManager> handshake_mgr_;
104 // State for enforcing handshake timeout on receiving HTTP/2 settings.
105 grpc_chttp2_transport* transport_ = nullptr;
106 grpc_millis deadline_;
107 grpc_timer timer_;
108 grpc_closure on_timeout_;
109 grpc_closure on_receive_settings_;
110 grpc_pollset_set* const interested_parties_;
111 };
112
113 static void OnAccept(void* arg, grpc_endpoint* tcp,
114 grpc_pollset* accepting_pollset,
115 grpc_tcp_server_acceptor* acceptor);
116
117 RefCountedPtr<HandshakeManager> CreateHandshakeManager();
118
119 static void TcpServerShutdownComplete(void* arg, grpc_error* error);
120
121 static void DestroyListener(Server* /*server*/, void* arg,
122 grpc_closure* destroy_done);
123
124 Server* const server_;
125 grpc_channel_args* const args_;
126 grpc_tcp_server* tcp_server_;
127 Mutex mu_;
128 bool shutdown_ = true;
129 grpc_closure tcp_server_shutdown_complete_;
130 grpc_closure* on_destroy_done_ = nullptr;
131 HandshakeManager* pending_handshake_mgrs_ = nullptr;
132 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
133 };
134
135 //
136 // Chttp2ServerListener::ConnectionState
137 //
138
GetConnectionDeadline(const grpc_channel_args * args)139 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
140 int timeout_ms =
141 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
142 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
143 return ExecCtx::Get()->Now() + timeout_ms;
144 }
145
ConnectionState(Chttp2ServerListener * listener,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,RefCountedPtr<HandshakeManager> handshake_mgr,grpc_channel_args * args,grpc_endpoint * endpoint)146 Chttp2ServerListener::ConnectionState::ConnectionState(
147 Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
148 grpc_tcp_server_acceptor* acceptor,
149 RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
150 grpc_endpoint* endpoint)
151 : listener_(listener),
152 accepting_pollset_(accepting_pollset),
153 acceptor_(acceptor),
154 handshake_mgr_(std::move(handshake_mgr)),
155 deadline_(GetConnectionDeadline(args)),
156 interested_parties_(grpc_pollset_set_create()) {
157 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
158 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
159 interested_parties_, handshake_mgr_.get());
160 handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
161 OnHandshakeDone, this);
162 }
163
~ConnectionState()164 Chttp2ServerListener::ConnectionState::~ConnectionState() {
165 if (transport_ != nullptr) {
166 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
167 }
168 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
169 grpc_pollset_set_destroy(interested_parties_);
170 }
171
OnTimeout(void * arg,grpc_error * error)172 void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
173 grpc_error* error) {
174 ConnectionState* self = static_cast<ConnectionState*>(arg);
175 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
176 // or with an error indicating that the timer system is being shut down.
177 if (error != GRPC_ERROR_CANCELLED) {
178 grpc_transport_op* op = grpc_make_transport_op(nullptr);
179 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
180 "Did not receive HTTP/2 settings before handshake timeout");
181 grpc_transport_perform_op(&self->transport_->base, op);
182 }
183 self->Unref();
184 }
185
OnReceiveSettings(void * arg,grpc_error * error)186 void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
187 void* arg, grpc_error* error) {
188 ConnectionState* self = static_cast<ConnectionState*>(arg);
189 if (error == GRPC_ERROR_NONE) {
190 grpc_timer_cancel(&self->timer_);
191 }
192 self->Unref();
193 }
194
OnHandshakeDone(void * arg,grpc_error * error)195 void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
196 grpc_error* error) {
197 auto* args = static_cast<HandshakerArgs*>(arg);
198 ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
199 {
200 MutexLock lock(&self->listener_->mu_);
201 grpc_resource_user* resource_user =
202 self->listener_->server_->default_resource_user();
203 if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
204 const char* error_str = grpc_error_string(error);
205 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
206 if (resource_user != nullptr) {
207 grpc_resource_user_free(resource_user,
208 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
209 }
210 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
211 // We were shut down after handshaking completed successfully, so
212 // destroy the endpoint here.
213 // TODO(ctiller): It is currently necessary to shutdown endpoints
214 // before destroying them, even if we know that there are no
215 // pending read/write callbacks. This should be fixed, at which
216 // point this can be removed.
217 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
218 grpc_endpoint_destroy(args->endpoint);
219 grpc_channel_args_destroy(args->args);
220 grpc_slice_buffer_destroy_internal(args->read_buffer);
221 gpr_free(args->read_buffer);
222 }
223 } else {
224 // If the handshaking succeeded but there is no endpoint, then the
225 // handshaker may have handed off the connection to some external
226 // code, so we can just clean up here without creating a transport.
227 if (args->endpoint != nullptr) {
228 grpc_transport* transport = grpc_create_chttp2_transport(
229 args->args, args->endpoint, false, resource_user);
230 self->listener_->server_->SetupTransport(
231 transport, self->accepting_pollset_, args->args,
232 grpc_chttp2_transport_get_socket_node(transport), resource_user);
233 // Use notify_on_receive_settings callback to enforce the
234 // handshake deadline.
235 // Note: The reinterpret_cast<>s here are safe, because
236 // grpc_chttp2_transport is a C-style extension of
237 // grpc_transport, so this is morally equivalent of a
238 // static_cast<> to a derived class.
239 // TODO(roth): Change to static_cast<> when we C++-ify the
240 // transport API.
241 self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
242 self->Ref().release(); // Held by OnReceiveSettings().
243 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
244 grpc_schedule_on_exec_ctx);
245 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
246 &self->on_receive_settings_);
247 grpc_channel_args_destroy(args->args);
248 self->Ref().release(); // Held by OnTimeout().
249 GRPC_CHTTP2_REF_TRANSPORT(
250 reinterpret_cast<grpc_chttp2_transport*>(transport),
251 "receive settings timeout");
252 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
253 grpc_schedule_on_exec_ctx);
254 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
255 } else {
256 if (resource_user != nullptr) {
257 grpc_resource_user_free(resource_user,
258 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
259 }
260 }
261 }
262 self->handshake_mgr_->RemoveFromPendingMgrList(
263 &self->listener_->pending_handshake_mgrs_);
264 }
265 self->handshake_mgr_.reset();
266 gpr_free(self->acceptor_);
267 grpc_tcp_server_unref(self->listener_->tcp_server_);
268 self->Unref();
269 }
270
271 //
272 // Chttp2ServerListener
273 //
274
Create(Server * server,const char * addr,grpc_channel_args * args,int * port_num)275 grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr,
276 grpc_channel_args* args,
277 int* port_num) {
278 std::vector<grpc_error*> error_list;
279 grpc_resolved_addresses* resolved = nullptr;
280 Chttp2ServerListener* listener = nullptr;
281 // The bulk of this method is inside of a lambda to make cleanup
282 // easier without using goto.
283 grpc_error* error = [&]() {
284 *port_num = -1;
285 /* resolve address */
286 grpc_error* error = GRPC_ERROR_NONE;
287 if (absl::StartsWith(addr, kUnixUriPrefix)) {
288 error = grpc_resolve_unix_domain_address(
289 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
290 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
291 error = grpc_resolve_unix_abstract_domain_address(
292 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
293 } else {
294 error = grpc_blocking_resolve_address(addr, "https", &resolved);
295 }
296 if (error != GRPC_ERROR_NONE) return error;
297 // Create Chttp2ServerListener.
298 listener = new Chttp2ServerListener(server, args);
299 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
300 args, &listener->tcp_server_);
301 if (error != GRPC_ERROR_NONE) return error;
302 for (size_t i = 0; i < resolved->naddrs; i++) {
303 int port_temp;
304 error = grpc_tcp_server_add_port(listener->tcp_server_,
305 &resolved->addrs[i], &port_temp);
306 if (error != GRPC_ERROR_NONE) {
307 error_list.push_back(error);
308 } else {
309 if (*port_num == -1) {
310 *port_num = port_temp;
311 } else {
312 GPR_ASSERT(*port_num == port_temp);
313 }
314 }
315 }
316 if (error_list.size() == resolved->naddrs) {
317 std::string msg =
318 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
319 resolved->naddrs);
320 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
321 msg.c_str(), error_list.data(), error_list.size());
322 } else if (!error_list.empty()) {
323 std::string msg = absl::StrFormat(
324 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
325 " resolved",
326 resolved->naddrs - error_list.size(), resolved->naddrs);
327 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
328 msg.c_str(), error_list.data(), error_list.size());
329 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
330 GRPC_ERROR_UNREF(error);
331 /* we managed to bind some addresses: continue */
332 }
333 // Create channelz node.
334 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
335 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
336 listener->channelz_listen_socket_ =
337 MakeRefCounted<channelz::ListenSocketNode>(
338 addr, absl::StrFormat("chttp2 listener %s", addr));
339 }
340 /* Register with the server only upon success */
341 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
342 return GRPC_ERROR_NONE;
343 }();
344 if (resolved != nullptr) {
345 grpc_resolved_addresses_destroy(resolved);
346 }
347 if (error != GRPC_ERROR_NONE) {
348 if (listener != nullptr) {
349 if (listener->tcp_server_ != nullptr) {
350 grpc_tcp_server_unref(listener->tcp_server_);
351 } else {
352 delete listener;
353 }
354 } else {
355 grpc_channel_args_destroy(args);
356 }
357 *port_num = 0;
358 }
359 for (grpc_error* error : error_list) {
360 GRPC_ERROR_UNREF(error);
361 }
362 return error;
363 }
364
CreateWithAcceptor(Server * server,const char * name,grpc_channel_args * args)365 grpc_error* Chttp2ServerListener::CreateWithAcceptor(Server* server,
366 const char* name,
367 grpc_channel_args* args) {
368 Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
369 grpc_error* error = grpc_tcp_server_create(
370 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
371 if (error != GRPC_ERROR_NONE) {
372 delete listener;
373 return error;
374 }
375 // TODO(yangg) channelz
376 TcpServerFdHandler** arg_val =
377 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
378 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
379 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
380 return GRPC_ERROR_NONE;
381 }
382
Chttp2ServerListener(Server * server,grpc_channel_args * args)383 Chttp2ServerListener::Chttp2ServerListener(Server* server,
384 grpc_channel_args* args)
385 : server_(server), args_(args) {
386 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
387 this, grpc_schedule_on_exec_ctx);
388 }
389
~Chttp2ServerListener()390 Chttp2ServerListener::~Chttp2ServerListener() {
391 grpc_channel_args_destroy(args_);
392 }
393
394 /* Server callback: start listening on our ports */
Start(Server *,const std::vector<grpc_pollset * > * pollsets)395 void Chttp2ServerListener::Start(Server* /*server*/,
396 const std::vector<grpc_pollset*>* pollsets) {
397 {
398 MutexLock lock(&mu_);
399 shutdown_ = false;
400 }
401 grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
402 }
403
SetOnDestroyDone(grpc_closure * on_destroy_done)404 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
405 MutexLock lock(&mu_);
406 on_destroy_done_ = on_destroy_done;
407 }
408
CreateHandshakeManager()409 RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
410 MutexLock lock(&mu_);
411 if (shutdown_) return nullptr;
412 grpc_resource_user* resource_user = server_->default_resource_user();
413 if (resource_user != nullptr &&
414 !grpc_resource_user_safe_alloc(resource_user,
415 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
416 gpr_log(GPR_ERROR,
417 "Memory quota exhausted, rejecting connection, no handshaking.");
418 return nullptr;
419 }
420 auto handshake_mgr = MakeRefCounted<HandshakeManager>();
421 handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
422 grpc_tcp_server_ref(tcp_server_); // Ref held by ConnectionState.
423 return handshake_mgr;
424 }
425
OnAccept(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)426 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
427 grpc_pollset* accepting_pollset,
428 grpc_tcp_server_acceptor* acceptor) {
429 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
430 RefCountedPtr<HandshakeManager> handshake_mgr =
431 self->CreateHandshakeManager();
432 if (handshake_mgr == nullptr) {
433 grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
434 grpc_endpoint_destroy(tcp);
435 gpr_free(acceptor);
436 return;
437 }
438 // Deletes itself when done.
439 new ConnectionState(self, accepting_pollset, acceptor,
440 std::move(handshake_mgr), self->args_, tcp);
441 }
442
TcpServerShutdownComplete(void * arg,grpc_error * error)443 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
444 grpc_error* error) {
445 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
446 /* ensure all threads have unlocked */
447 grpc_closure* destroy_done = nullptr;
448 {
449 MutexLock lock(&self->mu_);
450 destroy_done = self->on_destroy_done_;
451 GPR_ASSERT(self->shutdown_);
452 if (self->pending_handshake_mgrs_ != nullptr) {
453 self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
454 }
455 self->channelz_listen_socket_.reset();
456 }
457 // Flush queued work before destroying handshaker factory, since that
458 // may do a synchronous unref.
459 ExecCtx::Get()->Flush();
460 if (destroy_done != nullptr) {
461 ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
462 ExecCtx::Get()->Flush();
463 }
464 delete self;
465 }
466
467 /* Server callback: destroy the tcp listener (so we don't generate further
468 callbacks) */
Orphan()469 void Chttp2ServerListener::Orphan() {
470 grpc_tcp_server* tcp_server;
471 {
472 MutexLock lock(&mu_);
473 shutdown_ = true;
474 tcp_server = tcp_server_;
475 }
476 grpc_tcp_server_shutdown_listeners(tcp_server);
477 grpc_tcp_server_unref(tcp_server);
478 }
479
480 } // namespace
481
482 //
483 // Chttp2ServerAddPort()
484 //
485
Chttp2ServerAddPort(Server * server,const char * addr,grpc_channel_args * args,int * port_num)486 grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
487 grpc_channel_args* args, int* port_num) {
488 if (strncmp(addr, "external:", 9) == 0) {
489 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
490 args);
491 }
492 return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
493 }
494
495 } // namespace grpc_core
496