1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27 
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37 
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/channel/handshaker.h"
43 #include "src/core/lib/channel/handshaker_registry.h"
44 #include "src/core/lib/gprpp/ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/iomgr/endpoint.h"
47 #include "src/core/lib/iomgr/resolve_address.h"
48 #include "src/core/lib/iomgr/resource_quota.h"
49 #include "src/core/lib/iomgr/tcp_server.h"
50 #include "src/core/lib/iomgr/unix_sockets_posix.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/api_trace.h"
53 #include "src/core/lib/surface/server.h"
54 
55 namespace grpc_core {
56 namespace {
57 
58 const char kUnixUriPrefix[] = "unix:";
59 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
60 
61 class Chttp2ServerListener : public Server::ListenerInterface {
62  public:
63   static grpc_error* Create(Server* server, const char* addr,
64                             grpc_channel_args* args, int* port_num);
65 
66   static grpc_error* CreateWithAcceptor(Server* server, const char* name,
67                                         grpc_channel_args* args);
68 
69   // Do not instantiate directly.  Use one of the factory methods above.
70   Chttp2ServerListener(Server* server, grpc_channel_args* args);
71   ~Chttp2ServerListener() override;
72 
73   void Start(Server* server,
74              const std::vector<grpc_pollset*>* pollsets) override;
75 
channelz_listen_socket_node() const76   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
77     return channelz_listen_socket_.get();
78   }
79 
80   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
81 
82   void Orphan() override;
83 
84  private:
85   class ConnectionState : public RefCounted<ConnectionState> {
86    public:
87     ConnectionState(Chttp2ServerListener* listener,
88                     grpc_pollset* accepting_pollset,
89                     grpc_tcp_server_acceptor* acceptor,
90                     RefCountedPtr<HandshakeManager> handshake_mgr,
91                     grpc_channel_args* args, grpc_endpoint* endpoint);
92 
93     ~ConnectionState() override;
94 
95    private:
96     static void OnTimeout(void* arg, grpc_error* error);
97     static void OnReceiveSettings(void* arg, grpc_error* error);
98     static void OnHandshakeDone(void* arg, grpc_error* error);
99 
100     Chttp2ServerListener* const listener_;
101     grpc_pollset* const accepting_pollset_;
102     grpc_tcp_server_acceptor* const acceptor_;
103     RefCountedPtr<HandshakeManager> handshake_mgr_;
104     // State for enforcing handshake timeout on receiving HTTP/2 settings.
105     grpc_chttp2_transport* transport_ = nullptr;
106     grpc_millis deadline_;
107     grpc_timer timer_;
108     grpc_closure on_timeout_;
109     grpc_closure on_receive_settings_;
110     grpc_pollset_set* const interested_parties_;
111   };
112 
113   static void OnAccept(void* arg, grpc_endpoint* tcp,
114                        grpc_pollset* accepting_pollset,
115                        grpc_tcp_server_acceptor* acceptor);
116 
117   RefCountedPtr<HandshakeManager> CreateHandshakeManager();
118 
119   static void TcpServerShutdownComplete(void* arg, grpc_error* error);
120 
121   static void DestroyListener(Server* /*server*/, void* arg,
122                               grpc_closure* destroy_done);
123 
124   Server* const server_;
125   grpc_channel_args* const args_;
126   grpc_tcp_server* tcp_server_;
127   Mutex mu_;
128   bool shutdown_ = true;
129   grpc_closure tcp_server_shutdown_complete_;
130   grpc_closure* on_destroy_done_ = nullptr;
131   HandshakeManager* pending_handshake_mgrs_ = nullptr;
132   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
133 };
134 
135 //
136 // Chttp2ServerListener::ConnectionState
137 //
138 
GetConnectionDeadline(const grpc_channel_args * args)139 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
140   int timeout_ms =
141       grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
142                                      {120 * GPR_MS_PER_SEC, 1, INT_MAX});
143   return ExecCtx::Get()->Now() + timeout_ms;
144 }
145 
ConnectionState(Chttp2ServerListener * listener,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,RefCountedPtr<HandshakeManager> handshake_mgr,grpc_channel_args * args,grpc_endpoint * endpoint)146 Chttp2ServerListener::ConnectionState::ConnectionState(
147     Chttp2ServerListener* listener, grpc_pollset* accepting_pollset,
148     grpc_tcp_server_acceptor* acceptor,
149     RefCountedPtr<HandshakeManager> handshake_mgr, grpc_channel_args* args,
150     grpc_endpoint* endpoint)
151     : listener_(listener),
152       accepting_pollset_(accepting_pollset),
153       acceptor_(acceptor),
154       handshake_mgr_(std::move(handshake_mgr)),
155       deadline_(GetConnectionDeadline(args)),
156       interested_parties_(grpc_pollset_set_create()) {
157   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
158   HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
159                                      interested_parties_, handshake_mgr_.get());
160   handshake_mgr_->DoHandshake(endpoint, args, deadline_, acceptor_,
161                               OnHandshakeDone, this);
162 }
163 
~ConnectionState()164 Chttp2ServerListener::ConnectionState::~ConnectionState() {
165   if (transport_ != nullptr) {
166     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "receive settings timeout");
167   }
168   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
169   grpc_pollset_set_destroy(interested_parties_);
170 }
171 
OnTimeout(void * arg,grpc_error * error)172 void Chttp2ServerListener::ConnectionState::OnTimeout(void* arg,
173                                                       grpc_error* error) {
174   ConnectionState* self = static_cast<ConnectionState*>(arg);
175   // Note that we may be called with GRPC_ERROR_NONE when the timer fires
176   // or with an error indicating that the timer system is being shut down.
177   if (error != GRPC_ERROR_CANCELLED) {
178     grpc_transport_op* op = grpc_make_transport_op(nullptr);
179     op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
180         "Did not receive HTTP/2 settings before handshake timeout");
181     grpc_transport_perform_op(&self->transport_->base, op);
182   }
183   self->Unref();
184 }
185 
OnReceiveSettings(void * arg,grpc_error * error)186 void Chttp2ServerListener::ConnectionState::OnReceiveSettings(
187     void* arg, grpc_error* error) {
188   ConnectionState* self = static_cast<ConnectionState*>(arg);
189   if (error == GRPC_ERROR_NONE) {
190     grpc_timer_cancel(&self->timer_);
191   }
192   self->Unref();
193 }
194 
OnHandshakeDone(void * arg,grpc_error * error)195 void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
196                                                             grpc_error* error) {
197   auto* args = static_cast<HandshakerArgs*>(arg);
198   ConnectionState* self = static_cast<ConnectionState*>(args->user_data);
199   {
200     MutexLock lock(&self->listener_->mu_);
201     grpc_resource_user* resource_user =
202         self->listener_->server_->default_resource_user();
203     if (error != GRPC_ERROR_NONE || self->listener_->shutdown_) {
204       const char* error_str = grpc_error_string(error);
205       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
206       if (resource_user != nullptr) {
207         grpc_resource_user_free(resource_user,
208                                 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
209       }
210       if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
211         // We were shut down after handshaking completed successfully, so
212         // destroy the endpoint here.
213         // TODO(ctiller): It is currently necessary to shutdown endpoints
214         // before destroying them, even if we know that there are no
215         // pending read/write callbacks.  This should be fixed, at which
216         // point this can be removed.
217         grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
218         grpc_endpoint_destroy(args->endpoint);
219         grpc_channel_args_destroy(args->args);
220         grpc_slice_buffer_destroy_internal(args->read_buffer);
221         gpr_free(args->read_buffer);
222       }
223     } else {
224       // If the handshaking succeeded but there is no endpoint, then the
225       // handshaker may have handed off the connection to some external
226       // code, so we can just clean up here without creating a transport.
227       if (args->endpoint != nullptr) {
228         grpc_transport* transport = grpc_create_chttp2_transport(
229             args->args, args->endpoint, false, resource_user);
230         self->listener_->server_->SetupTransport(
231             transport, self->accepting_pollset_, args->args,
232             grpc_chttp2_transport_get_socket_node(transport), resource_user);
233         // Use notify_on_receive_settings callback to enforce the
234         // handshake deadline.
235         // Note: The reinterpret_cast<>s here are safe, because
236         // grpc_chttp2_transport is a C-style extension of
237         // grpc_transport, so this is morally equivalent of a
238         // static_cast<> to a derived class.
239         // TODO(roth): Change to static_cast<> when we C++-ify the
240         // transport API.
241         self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
242         self->Ref().release();  // Held by OnReceiveSettings().
243         GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
244                           grpc_schedule_on_exec_ctx);
245         grpc_chttp2_transport_start_reading(transport, args->read_buffer,
246                                             &self->on_receive_settings_);
247         grpc_channel_args_destroy(args->args);
248         self->Ref().release();  // Held by OnTimeout().
249         GRPC_CHTTP2_REF_TRANSPORT(
250             reinterpret_cast<grpc_chttp2_transport*>(transport),
251             "receive settings timeout");
252         GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
253                           grpc_schedule_on_exec_ctx);
254         grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
255       } else {
256         if (resource_user != nullptr) {
257           grpc_resource_user_free(resource_user,
258                                   GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
259         }
260       }
261     }
262     self->handshake_mgr_->RemoveFromPendingMgrList(
263         &self->listener_->pending_handshake_mgrs_);
264   }
265   self->handshake_mgr_.reset();
266   gpr_free(self->acceptor_);
267   grpc_tcp_server_unref(self->listener_->tcp_server_);
268   self->Unref();
269 }
270 
271 //
272 // Chttp2ServerListener
273 //
274 
Create(Server * server,const char * addr,grpc_channel_args * args,int * port_num)275 grpc_error* Chttp2ServerListener::Create(Server* server, const char* addr,
276                                          grpc_channel_args* args,
277                                          int* port_num) {
278   std::vector<grpc_error*> error_list;
279   grpc_resolved_addresses* resolved = nullptr;
280   Chttp2ServerListener* listener = nullptr;
281   // The bulk of this method is inside of a lambda to make cleanup
282   // easier without using goto.
283   grpc_error* error = [&]() {
284     *port_num = -1;
285     /* resolve address */
286     grpc_error* error = GRPC_ERROR_NONE;
287     if (absl::StartsWith(addr, kUnixUriPrefix)) {
288       error = grpc_resolve_unix_domain_address(
289           addr + sizeof(kUnixUriPrefix) - 1, &resolved);
290     } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
291       error = grpc_resolve_unix_abstract_domain_address(
292           addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
293     } else {
294       error = grpc_blocking_resolve_address(addr, "https", &resolved);
295     }
296     if (error != GRPC_ERROR_NONE) return error;
297     // Create Chttp2ServerListener.
298     listener = new Chttp2ServerListener(server, args);
299     error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
300                                    args, &listener->tcp_server_);
301     if (error != GRPC_ERROR_NONE) return error;
302     for (size_t i = 0; i < resolved->naddrs; i++) {
303       int port_temp;
304       error = grpc_tcp_server_add_port(listener->tcp_server_,
305                                        &resolved->addrs[i], &port_temp);
306       if (error != GRPC_ERROR_NONE) {
307         error_list.push_back(error);
308       } else {
309         if (*port_num == -1) {
310           *port_num = port_temp;
311         } else {
312           GPR_ASSERT(*port_num == port_temp);
313         }
314       }
315     }
316     if (error_list.size() == resolved->naddrs) {
317       std::string msg =
318           absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
319                           resolved->naddrs);
320       return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
321           msg.c_str(), error_list.data(), error_list.size());
322     } else if (!error_list.empty()) {
323       std::string msg = absl::StrFormat(
324           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
325           " resolved",
326           resolved->naddrs - error_list.size(), resolved->naddrs);
327       error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
328           msg.c_str(), error_list.data(), error_list.size());
329       gpr_log(GPR_INFO, "WARNING: %s", grpc_error_string(error));
330       GRPC_ERROR_UNREF(error);
331       /* we managed to bind some addresses: continue */
332     }
333     // Create channelz node.
334     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
335                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
336       listener->channelz_listen_socket_ =
337           MakeRefCounted<channelz::ListenSocketNode>(
338               addr, absl::StrFormat("chttp2 listener %s", addr));
339     }
340     /* Register with the server only upon success */
341     server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
342     return GRPC_ERROR_NONE;
343   }();
344   if (resolved != nullptr) {
345     grpc_resolved_addresses_destroy(resolved);
346   }
347   if (error != GRPC_ERROR_NONE) {
348     if (listener != nullptr) {
349       if (listener->tcp_server_ != nullptr) {
350         grpc_tcp_server_unref(listener->tcp_server_);
351       } else {
352         delete listener;
353       }
354     } else {
355       grpc_channel_args_destroy(args);
356     }
357     *port_num = 0;
358   }
359   for (grpc_error* error : error_list) {
360     GRPC_ERROR_UNREF(error);
361   }
362   return error;
363 }
364 
CreateWithAcceptor(Server * server,const char * name,grpc_channel_args * args)365 grpc_error* Chttp2ServerListener::CreateWithAcceptor(Server* server,
366                                                      const char* name,
367                                                      grpc_channel_args* args) {
368   Chttp2ServerListener* listener = new Chttp2ServerListener(server, args);
369   grpc_error* error = grpc_tcp_server_create(
370       &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
371   if (error != GRPC_ERROR_NONE) {
372     delete listener;
373     return error;
374   }
375   // TODO(yangg) channelz
376   TcpServerFdHandler** arg_val =
377       grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
378   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
379   server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
380   return GRPC_ERROR_NONE;
381 }
382 
Chttp2ServerListener(Server * server,grpc_channel_args * args)383 Chttp2ServerListener::Chttp2ServerListener(Server* server,
384                                            grpc_channel_args* args)
385     : server_(server), args_(args) {
386   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
387                     this, grpc_schedule_on_exec_ctx);
388 }
389 
~Chttp2ServerListener()390 Chttp2ServerListener::~Chttp2ServerListener() {
391   grpc_channel_args_destroy(args_);
392 }
393 
394 /* Server callback: start listening on our ports */
Start(Server *,const std::vector<grpc_pollset * > * pollsets)395 void Chttp2ServerListener::Start(Server* /*server*/,
396                                  const std::vector<grpc_pollset*>* pollsets) {
397   {
398     MutexLock lock(&mu_);
399     shutdown_ = false;
400   }
401   grpc_tcp_server_start(tcp_server_, pollsets, OnAccept, this);
402 }
403 
SetOnDestroyDone(grpc_closure * on_destroy_done)404 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
405   MutexLock lock(&mu_);
406   on_destroy_done_ = on_destroy_done;
407 }
408 
CreateHandshakeManager()409 RefCountedPtr<HandshakeManager> Chttp2ServerListener::CreateHandshakeManager() {
410   MutexLock lock(&mu_);
411   if (shutdown_) return nullptr;
412   grpc_resource_user* resource_user = server_->default_resource_user();
413   if (resource_user != nullptr &&
414       !grpc_resource_user_safe_alloc(resource_user,
415                                      GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
416     gpr_log(GPR_ERROR,
417             "Memory quota exhausted, rejecting connection, no handshaking.");
418     return nullptr;
419   }
420   auto handshake_mgr = MakeRefCounted<HandshakeManager>();
421   handshake_mgr->AddToPendingMgrList(&pending_handshake_mgrs_);
422   grpc_tcp_server_ref(tcp_server_);  // Ref held by ConnectionState.
423   return handshake_mgr;
424 }
425 
OnAccept(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)426 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
427                                     grpc_pollset* accepting_pollset,
428                                     grpc_tcp_server_acceptor* acceptor) {
429   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
430   RefCountedPtr<HandshakeManager> handshake_mgr =
431       self->CreateHandshakeManager();
432   if (handshake_mgr == nullptr) {
433     grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
434     grpc_endpoint_destroy(tcp);
435     gpr_free(acceptor);
436     return;
437   }
438   // Deletes itself when done.
439   new ConnectionState(self, accepting_pollset, acceptor,
440                       std::move(handshake_mgr), self->args_, tcp);
441 }
442 
TcpServerShutdownComplete(void * arg,grpc_error * error)443 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
444                                                      grpc_error* error) {
445   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
446   /* ensure all threads have unlocked */
447   grpc_closure* destroy_done = nullptr;
448   {
449     MutexLock lock(&self->mu_);
450     destroy_done = self->on_destroy_done_;
451     GPR_ASSERT(self->shutdown_);
452     if (self->pending_handshake_mgrs_ != nullptr) {
453       self->pending_handshake_mgrs_->ShutdownAllPending(GRPC_ERROR_REF(error));
454     }
455     self->channelz_listen_socket_.reset();
456   }
457   // Flush queued work before destroying handshaker factory, since that
458   // may do a synchronous unref.
459   ExecCtx::Get()->Flush();
460   if (destroy_done != nullptr) {
461     ExecCtx::Run(DEBUG_LOCATION, destroy_done, GRPC_ERROR_REF(error));
462     ExecCtx::Get()->Flush();
463   }
464   delete self;
465 }
466 
467 /* Server callback: destroy the tcp listener (so we don't generate further
468    callbacks) */
Orphan()469 void Chttp2ServerListener::Orphan() {
470   grpc_tcp_server* tcp_server;
471   {
472     MutexLock lock(&mu_);
473     shutdown_ = true;
474     tcp_server = tcp_server_;
475   }
476   grpc_tcp_server_shutdown_listeners(tcp_server);
477   grpc_tcp_server_unref(tcp_server);
478 }
479 
480 }  // namespace
481 
482 //
483 // Chttp2ServerAddPort()
484 //
485 
Chttp2ServerAddPort(Server * server,const char * addr,grpc_channel_args * args,int * port_num)486 grpc_error* Chttp2ServerAddPort(Server* server, const char* addr,
487                                 grpc_channel_args* args, int* port_num) {
488   if (strncmp(addr, "external:", 9) == 0) {
489     return grpc_core::Chttp2ServerListener::CreateWithAcceptor(server, addr,
490                                                                args);
491   }
492   return grpc_core::Chttp2ServerListener::Create(server, addr, args, port_num);
493 }
494 
495 }  // namespace grpc_core
496