1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SharedMutex.h>
21 #include <folly/executors/IOThreadPoolExecutor.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/io/async/EventBaseManager.h>
24 #include <wangle/acceptor/Acceptor.h>
25 #include <wangle/acceptor/ManagedConnection.h>
26 #include <wangle/acceptor/SharedSSLContextManager.h>
27 #include <wangle/bootstrap/ServerSocketFactory.h>
28 #include <wangle/channel/Handler.h>
29 #include <wangle/channel/Pipeline.h>
30 #include <wangle/ssl/SSLStats.h>
31 
32 namespace wangle {
33 
34 class AcceptorException : public std::runtime_error {
35  public:
36   enum class ExceptionType {
37     UNKNOWN = 0,
38     TIMED_OUT = 1,
39     DROPPED = 2,
40     ACCEPT_STOPPED = 3,
41     DRAIN_CONN_PCT = 4,
42     DROP_CONN_PCT = 5,
43     FORCE_STOP = 6,
44     INTERNAL_ERROR = 7,
45     ACCEPT_PAUSED = 8,
46     ACCEPT_RESUMED = 9,
47     SHUTDOWN_PENDING = 10, // A graceful shutdown has been scheduled.
48   };
49 
AcceptorException(ExceptionType type)50   explicit AcceptorException(ExceptionType type)
51       : std::runtime_error(""), type_(type), pct_(0.0) {}
52 
AcceptorException(ExceptionType type,const std::string & message)53   explicit AcceptorException(ExceptionType type, const std::string& message)
54       : std::runtime_error(message), type_(type), pct_(0.0) {}
55 
AcceptorException(ExceptionType type,const std::string & message,double pct)56   explicit AcceptorException(
57       ExceptionType type,
58       const std::string& message,
59       double pct)
60       : std::runtime_error(message), type_(type), pct_(pct) {}
61 
getType()62   ExceptionType getType() const noexcept {
63     return type_;
64   }
getPct()65   double getPct() const noexcept {
66     return pct_;
67   }
68 
69  protected:
70   const ExceptionType type_;
71   // the percentage of connections to be drained or dropped during the shutdown
72   const double pct_;
73 };
74 
75 template <typename Pipeline>
76 class ServerAcceptor : public Acceptor,
77                        public wangle::InboundHandler<AcceptPipelineType> {
78  public:
79   using OnDataAvailableParams =
80       folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams;
81 
82   class ServerConnection : public wangle::ManagedConnection,
83                            public wangle::PipelineManager {
84    public:
ServerConnection(typename Pipeline::Ptr pipeline)85     explicit ServerConnection(typename Pipeline::Ptr pipeline)
86         : pipeline_(std::move(pipeline)) {
87       pipeline_->setPipelineManager(this);
88     }
89 
timeoutExpired()90     void timeoutExpired() noexcept override {
91       auto ew = folly::make_exception_wrapper<AcceptorException>(
92           AcceptorException::ExceptionType::TIMED_OUT, "timeout");
93       pipeline_->readException(ew);
94     }
95 
describe(std::ostream &)96     void describe(std::ostream&) const override {}
isBusy()97     bool isBusy() const override {
98       return true;
99     }
100 
notifyPendingShutdown()101     void notifyPendingShutdown() override {
102       if (enableNotifyPendingShutdown_) {
103         auto ew = folly::make_exception_wrapper<AcceptorException>(
104             AcceptorException::ExceptionType::SHUTDOWN_PENDING,
105             "shutdown_pending");
106         pipeline_->readException(ew);
107       }
108     }
109 
closeWhenIdle()110     void closeWhenIdle() override {}
111     void dropConnection(const std::string& /* errorMsg */ = "") override {
112       auto ew = folly::make_exception_wrapper<AcceptorException>(
113           AcceptorException::ExceptionType::DROPPED, "dropped");
114       pipeline_->readException(ew);
115     }
dumpConnectionState(uint8_t)116     void dumpConnectionState(uint8_t /* loglevel */) override {}
117 
deletePipeline(wangle::PipelineBase * p)118     void deletePipeline(wangle::PipelineBase* p) override {
119       CHECK(p == pipeline_.get());
120       destroy();
121     }
122 
init()123     void init() {
124       pipeline_->transportActive();
125     }
126 
refreshTimeout()127     void refreshTimeout() override {
128       resetTimeout();
129     }
130 
setNotifyPendingShutdown(bool isEnabled)131     void setNotifyPendingShutdown(bool isEnabled) {
132       enableNotifyPendingShutdown_ = isEnabled;
133     }
134 
135    private:
~ServerConnection()136     ~ServerConnection() override {
137       pipeline_->setPipelineManager(nullptr);
138     }
139     typename Pipeline::Ptr pipeline_;
140     bool enableNotifyPendingShutdown_{false};
141   };
142 
ServerAcceptor(std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,const ServerSocketConfig & accConfig)143   explicit ServerAcceptor(
144       std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
145       std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
146       const ServerSocketConfig& accConfig)
147       : Acceptor(accConfig),
148         acceptPipelineFactory_(acceptPipelineFactory),
149         childPipelineFactory_(childPipelineFactory) {}
150 
151   void init(
152       folly::AsyncServerSocket* serverSocket,
153       folly::EventBase* eventBase,
154       SSLStats* stats = nullptr,
155       std::shared_ptr<const fizz::server::FizzServerContext> fizzCtx =
156           nullptr) override {
157     Acceptor::init(serverSocket, eventBase, stats, fizzCtx);
158 
159     acceptPipeline_ = acceptPipelineFactory_->newPipeline(this);
160 
161     if (childPipelineFactory_) {
162       // This means a custom AcceptPipelineFactory was not passed in via
163       // pipeline() and we're using the DefaultAcceptPipelineFactory.
164       // Add the default inbound handler here.
165       acceptPipeline_->addBack(this);
166     }
167     acceptPipeline_->finalize();
168   }
169 
setNotifyPendingShutdown(bool isEnabled)170   void setNotifyPendingShutdown(bool isEnabled) {
171     enableNotifyPendingShutdown_ = isEnabled;
172   }
173 
read(Context *,AcceptPipelineType conn)174   void read(Context*, AcceptPipelineType conn) override {
175     if (conn.type() != typeid(ConnInfo&)) {
176       return;
177     }
178 
179     auto connInfo = boost::get<ConnInfo&>(conn);
180     folly::AsyncTransport::UniquePtr transport(connInfo.sock);
181 
182     // Setup local and remote addresses
183     auto tInfoPtr = std::make_shared<TransportInfo>(connInfo.tinfo);
184     tInfoPtr->localAddr =
185         std::make_shared<folly::SocketAddress>(accConfig_.bindAddress);
186     transport->getLocalAddress(tInfoPtr->localAddr.get());
187     tInfoPtr->remoteAddr =
188         std::make_shared<folly::SocketAddress>(*connInfo.clientAddr);
189     tInfoPtr->appProtocol =
190         std::make_shared<std::string>(connInfo.nextProtoName);
191 
192     auto pipeline = childPipelineFactory_->newPipeline(
193         std::shared_ptr<folly::AsyncTransport>(
194             transport.release(), folly::DelayedDestruction::Destructor()));
195     pipeline->setTransportInfo(tInfoPtr);
196     auto connection = new ServerConnection(std::move(pipeline));
197     connection->setNotifyPendingShutdown(enableNotifyPendingShutdown_);
198     Acceptor::addConnection(connection);
199     connection->init();
200   }
201 
202   // Null implementation to terminate the call in this handler
203   // and suppress warnings
readEOF(Context *)204   void readEOF(Context*) override {}
readException(Context *,folly::exception_wrapper)205   void readException(Context*, folly::exception_wrapper) override {}
206 
207   /* See Acceptor::onNewConnection for details */
onNewConnection(folly::AsyncTransport::UniquePtr transport,const folly::SocketAddress * clientAddr,const std::string & nextProtocolName,SecureTransportType secureTransportType,const TransportInfo & tinfo)208   void onNewConnection(
209       folly::AsyncTransport::UniquePtr transport,
210       const folly::SocketAddress* clientAddr,
211       const std::string& nextProtocolName,
212       SecureTransportType secureTransportType,
213       const TransportInfo& tinfo) override {
214     ConnInfo connInfo = {
215         transport.release(),
216         clientAddr,
217         nextProtocolName,
218         secureTransportType,
219         tinfo};
220     acceptPipeline_->read(connInfo);
221   }
222 
223   // notify the acceptors in the acceptPipeline to drain & drop conns
acceptStopped()224   void acceptStopped() noexcept override {
225     auto ew = folly::make_exception_wrapper<AcceptorException>(
226         AcceptorException::ExceptionType::ACCEPT_STOPPED,
227         "graceful shutdown timeout");
228 
229     acceptPipeline_->readException(ew);
230     Acceptor::acceptStopped();
231   }
232 
drainConnections(double pct)233   void drainConnections(double pct) noexcept override {
234     auto ew = folly::make_exception_wrapper<AcceptorException>(
235         AcceptorException::ExceptionType::DRAIN_CONN_PCT,
236         "draining some connections",
237         pct);
238 
239     acceptPipeline_->readException(ew);
240     Acceptor::drainConnections(pct);
241   }
242 
dropConnections(double pct)243   void dropConnections(double pct) noexcept override {
244     auto ew = folly::make_exception_wrapper<AcceptorException>(
245         AcceptorException::ExceptionType::DROP_CONN_PCT,
246         "dropping some connections",
247         pct);
248 
249     acceptPipeline_->readException(ew);
250     Acceptor::dropConnections(pct);
251   }
252 
forceStop()253   void forceStop() noexcept override {
254     auto ew = folly::make_exception_wrapper<AcceptorException>(
255         AcceptorException::ExceptionType::FORCE_STOP, "hard shutdown timeout");
256 
257     acceptPipeline_->readException(ew);
258     Acceptor::forceStop();
259   }
260 
261   // UDP thunk
onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,const folly::SocketAddress & addr,std::unique_ptr<folly::IOBuf> buf,bool,OnDataAvailableParams)262   void onDataAvailable(
263       std::shared_ptr<folly::AsyncUDPSocket> socket,
264       const folly::SocketAddress& addr,
265       std::unique_ptr<folly::IOBuf> buf,
266       bool /* truncated */,
267       OnDataAvailableParams /* params */) noexcept override {
268     acceptPipeline_->read(
269         AcceptPipelineType(make_tuple(buf.release(), socket, addr)));
270   }
271 
onConnectionAdded(const ManagedConnection *)272   void onConnectionAdded(const ManagedConnection*) override {
273     acceptPipeline_->read(ConnEvent::CONN_ADDED);
274   }
275 
onConnectionRemoved(const ManagedConnection *)276   void onConnectionRemoved(const ManagedConnection*) override {
277     acceptPipeline_->read(ConnEvent::CONN_REMOVED);
278   }
279 
sslConnectionError(const folly::exception_wrapper & ex)280   void sslConnectionError(const folly::exception_wrapper& ex) override {
281     acceptPipeline_->readException(ex);
282     Acceptor::sslConnectionError(ex);
283   }
284 
285  protected:
286   std::shared_ptr<AcceptPipeline> acceptPipeline_;
287   bool enableNotifyPendingShutdown_{false};
288 
289  private:
290   std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
291   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
292 };
293 
294 template <typename Pipeline>
295 class ServerAcceptorFactory : public AcceptorFactory {
296  public:
ServerAcceptorFactory(std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,const ServerSocketConfig & accConfig)297   explicit ServerAcceptorFactory(
298       std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
299       std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
300       const ServerSocketConfig& accConfig)
301       : acceptPipelineFactory_(acceptPipelineFactory),
302         childPipelineFactory_(childPipelineFactory),
303         accConfig_(accConfig) {}
304 
enableSharedSSLContext(bool enable)305   virtual void enableSharedSSLContext(bool enable) {
306     if (enable) {
307       sharedSSLContextManager_ =
308           std::make_shared<SharedSSLContextManagerImpl<FizzConfigUtil>>(
309               accConfig_);
310     }
311   }
312 
newAcceptor(folly::EventBase * base)313   std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) override {
314     auto acceptor = std::make_shared<ServerAcceptor<Pipeline>>(
315         acceptPipelineFactory_, childPipelineFactory_, accConfig_);
316 
317     if (sharedSSLContextManager_) {
318       acceptor->setFizzCertManager(sharedSSLContextManager_->getCertManager());
319       acceptor->setSSLContextManager(
320           sharedSSLContextManager_->getContextManager());
321       acceptor->init(
322           nullptr, base, nullptr, sharedSSLContextManager_->getFizzContext());
323       sharedSSLContextManager_->addAcceptor(acceptor);
324     } else {
325       acceptor->init(nullptr, base, nullptr);
326     }
327     return acceptor;
328   }
329 
getSharedSSLContextManager()330   std::shared_ptr<SharedSSLContextManager> getSharedSSLContextManager() const {
331     return sharedSSLContextManager_;
332   }
333 
334  private:
335   std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
336   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
337   ServerSocketConfig accConfig_;
338   std::shared_ptr<SharedSSLContextManager> sharedSSLContextManager_;
339 };
340 
341 class ServerWorkerPool : public folly::ThreadPoolExecutor::Observer {
342  public:
ServerWorkerPool(std::shared_ptr<AcceptorFactory> acceptorFactory,folly::IOThreadPoolExecutor * exec,std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,std::shared_ptr<ServerSocketFactory> socketFactory)343   explicit ServerWorkerPool(
344       std::shared_ptr<AcceptorFactory> acceptorFactory,
345       folly::IOThreadPoolExecutor* exec,
346       std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
347           sockets,
348       std::shared_ptr<ServerSocketFactory> socketFactory)
349       : workers_(std::make_shared<WorkerMap>()),
350         workersMutex_(std::make_shared<Mutex>()),
351         acceptorFactory_(acceptorFactory),
352         exec_(exec),
353         sockets_(sockets),
354         socketFactory_(socketFactory) {
355     CHECK(exec);
356   }
357 
358   template <typename F>
359   void forEachWorker(F&& f) const;
360 
361   void threadStarted(folly::ThreadPoolExecutor::ThreadHandle*) override;
362   void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override;
threadPreviouslyStarted(folly::ThreadPoolExecutor::ThreadHandle * thread)363   void threadPreviouslyStarted(
364       folly::ThreadPoolExecutor::ThreadHandle* thread) override {
365     threadStarted(thread);
366   }
threadNotYetStopped(folly::ThreadPoolExecutor::ThreadHandle * thread)367   void threadNotYetStopped(
368       folly::ThreadPoolExecutor::ThreadHandle* thread) override {
369     threadStopped(thread);
370   }
371 
372  private:
373   using WorkerMap = std::
374       map<folly::ThreadPoolExecutor::ThreadHandle*, std::shared_ptr<Acceptor>>;
375   using Mutex = folly::SharedMutexReadPriority;
376 
377   std::shared_ptr<WorkerMap> workers_;
378   std::shared_ptr<Mutex> workersMutex_;
379   std::shared_ptr<AcceptorFactory> acceptorFactory_;
380   folly::IOThreadPoolExecutor* exec_{nullptr};
381   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
382       sockets_;
383   std::shared_ptr<ServerSocketFactory> socketFactory_;
384 };
385 
386 template <typename F>
forEachWorker(F && f)387 void ServerWorkerPool::forEachWorker(F&& f) const {
388   Mutex::ReadHolder holder(workersMutex_.get());
389   for (const auto& kv : *workers_) {
390     f(kv.second.get());
391   }
392 }
393 
394 class DefaultAcceptPipelineFactory : public AcceptPipelineFactory {
395  public:
newPipeline(Acceptor *)396   typename AcceptPipeline::Ptr newPipeline(Acceptor*) override {
397     return AcceptPipeline::create();
398   }
399 };
400 
401 } // namespace wangle
402