1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SharedMutex.h>
21 #include <folly/executors/IOThreadPoolExecutor.h>
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/io/async/EventBaseManager.h>
24 #include <wangle/acceptor/Acceptor.h>
25 #include <wangle/acceptor/ManagedConnection.h>
26 #include <wangle/acceptor/SharedSSLContextManager.h>
27 #include <wangle/bootstrap/ServerSocketFactory.h>
28 #include <wangle/channel/Handler.h>
29 #include <wangle/channel/Pipeline.h>
30 #include <wangle/ssl/SSLStats.h>
31
32 namespace wangle {
33
34 class AcceptorException : public std::runtime_error {
35 public:
36 enum class ExceptionType {
37 UNKNOWN = 0,
38 TIMED_OUT = 1,
39 DROPPED = 2,
40 ACCEPT_STOPPED = 3,
41 DRAIN_CONN_PCT = 4,
42 DROP_CONN_PCT = 5,
43 FORCE_STOP = 6,
44 INTERNAL_ERROR = 7,
45 ACCEPT_PAUSED = 8,
46 ACCEPT_RESUMED = 9,
47 SHUTDOWN_PENDING = 10, // A graceful shutdown has been scheduled.
48 };
49
AcceptorException(ExceptionType type)50 explicit AcceptorException(ExceptionType type)
51 : std::runtime_error(""), type_(type), pct_(0.0) {}
52
AcceptorException(ExceptionType type,const std::string & message)53 explicit AcceptorException(ExceptionType type, const std::string& message)
54 : std::runtime_error(message), type_(type), pct_(0.0) {}
55
AcceptorException(ExceptionType type,const std::string & message,double pct)56 explicit AcceptorException(
57 ExceptionType type,
58 const std::string& message,
59 double pct)
60 : std::runtime_error(message), type_(type), pct_(pct) {}
61
getType()62 ExceptionType getType() const noexcept {
63 return type_;
64 }
getPct()65 double getPct() const noexcept {
66 return pct_;
67 }
68
69 protected:
70 const ExceptionType type_;
71 // the percentage of connections to be drained or dropped during the shutdown
72 const double pct_;
73 };
74
75 template <typename Pipeline>
76 class ServerAcceptor : public Acceptor,
77 public wangle::InboundHandler<AcceptPipelineType> {
78 public:
79 using OnDataAvailableParams =
80 folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams;
81
82 class ServerConnection : public wangle::ManagedConnection,
83 public wangle::PipelineManager {
84 public:
ServerConnection(typename Pipeline::Ptr pipeline)85 explicit ServerConnection(typename Pipeline::Ptr pipeline)
86 : pipeline_(std::move(pipeline)) {
87 pipeline_->setPipelineManager(this);
88 }
89
timeoutExpired()90 void timeoutExpired() noexcept override {
91 auto ew = folly::make_exception_wrapper<AcceptorException>(
92 AcceptorException::ExceptionType::TIMED_OUT, "timeout");
93 pipeline_->readException(ew);
94 }
95
describe(std::ostream &)96 void describe(std::ostream&) const override {}
isBusy()97 bool isBusy() const override {
98 return true;
99 }
100
notifyPendingShutdown()101 void notifyPendingShutdown() override {
102 if (enableNotifyPendingShutdown_) {
103 auto ew = folly::make_exception_wrapper<AcceptorException>(
104 AcceptorException::ExceptionType::SHUTDOWN_PENDING,
105 "shutdown_pending");
106 pipeline_->readException(ew);
107 }
108 }
109
closeWhenIdle()110 void closeWhenIdle() override {}
111 void dropConnection(const std::string& /* errorMsg */ = "") override {
112 auto ew = folly::make_exception_wrapper<AcceptorException>(
113 AcceptorException::ExceptionType::DROPPED, "dropped");
114 pipeline_->readException(ew);
115 }
dumpConnectionState(uint8_t)116 void dumpConnectionState(uint8_t /* loglevel */) override {}
117
deletePipeline(wangle::PipelineBase * p)118 void deletePipeline(wangle::PipelineBase* p) override {
119 CHECK(p == pipeline_.get());
120 destroy();
121 }
122
init()123 void init() {
124 pipeline_->transportActive();
125 }
126
refreshTimeout()127 void refreshTimeout() override {
128 resetTimeout();
129 }
130
setNotifyPendingShutdown(bool isEnabled)131 void setNotifyPendingShutdown(bool isEnabled) {
132 enableNotifyPendingShutdown_ = isEnabled;
133 }
134
135 private:
~ServerConnection()136 ~ServerConnection() override {
137 pipeline_->setPipelineManager(nullptr);
138 }
139 typename Pipeline::Ptr pipeline_;
140 bool enableNotifyPendingShutdown_{false};
141 };
142
ServerAcceptor(std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,const ServerSocketConfig & accConfig)143 explicit ServerAcceptor(
144 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
145 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
146 const ServerSocketConfig& accConfig)
147 : Acceptor(accConfig),
148 acceptPipelineFactory_(acceptPipelineFactory),
149 childPipelineFactory_(childPipelineFactory) {}
150
151 void init(
152 folly::AsyncServerSocket* serverSocket,
153 folly::EventBase* eventBase,
154 SSLStats* stats = nullptr,
155 std::shared_ptr<const fizz::server::FizzServerContext> fizzCtx =
156 nullptr) override {
157 Acceptor::init(serverSocket, eventBase, stats, fizzCtx);
158
159 acceptPipeline_ = acceptPipelineFactory_->newPipeline(this);
160
161 if (childPipelineFactory_) {
162 // This means a custom AcceptPipelineFactory was not passed in via
163 // pipeline() and we're using the DefaultAcceptPipelineFactory.
164 // Add the default inbound handler here.
165 acceptPipeline_->addBack(this);
166 }
167 acceptPipeline_->finalize();
168 }
169
setNotifyPendingShutdown(bool isEnabled)170 void setNotifyPendingShutdown(bool isEnabled) {
171 enableNotifyPendingShutdown_ = isEnabled;
172 }
173
read(Context *,AcceptPipelineType conn)174 void read(Context*, AcceptPipelineType conn) override {
175 if (conn.type() != typeid(ConnInfo&)) {
176 return;
177 }
178
179 auto connInfo = boost::get<ConnInfo&>(conn);
180 folly::AsyncTransport::UniquePtr transport(connInfo.sock);
181
182 // Setup local and remote addresses
183 auto tInfoPtr = std::make_shared<TransportInfo>(connInfo.tinfo);
184 tInfoPtr->localAddr =
185 std::make_shared<folly::SocketAddress>(accConfig_.bindAddress);
186 transport->getLocalAddress(tInfoPtr->localAddr.get());
187 tInfoPtr->remoteAddr =
188 std::make_shared<folly::SocketAddress>(*connInfo.clientAddr);
189 tInfoPtr->appProtocol =
190 std::make_shared<std::string>(connInfo.nextProtoName);
191
192 auto pipeline = childPipelineFactory_->newPipeline(
193 std::shared_ptr<folly::AsyncTransport>(
194 transport.release(), folly::DelayedDestruction::Destructor()));
195 pipeline->setTransportInfo(tInfoPtr);
196 auto connection = new ServerConnection(std::move(pipeline));
197 connection->setNotifyPendingShutdown(enableNotifyPendingShutdown_);
198 Acceptor::addConnection(connection);
199 connection->init();
200 }
201
202 // Null implementation to terminate the call in this handler
203 // and suppress warnings
readEOF(Context *)204 void readEOF(Context*) override {}
readException(Context *,folly::exception_wrapper)205 void readException(Context*, folly::exception_wrapper) override {}
206
207 /* See Acceptor::onNewConnection for details */
onNewConnection(folly::AsyncTransport::UniquePtr transport,const folly::SocketAddress * clientAddr,const std::string & nextProtocolName,SecureTransportType secureTransportType,const TransportInfo & tinfo)208 void onNewConnection(
209 folly::AsyncTransport::UniquePtr transport,
210 const folly::SocketAddress* clientAddr,
211 const std::string& nextProtocolName,
212 SecureTransportType secureTransportType,
213 const TransportInfo& tinfo) override {
214 ConnInfo connInfo = {
215 transport.release(),
216 clientAddr,
217 nextProtocolName,
218 secureTransportType,
219 tinfo};
220 acceptPipeline_->read(connInfo);
221 }
222
223 // notify the acceptors in the acceptPipeline to drain & drop conns
acceptStopped()224 void acceptStopped() noexcept override {
225 auto ew = folly::make_exception_wrapper<AcceptorException>(
226 AcceptorException::ExceptionType::ACCEPT_STOPPED,
227 "graceful shutdown timeout");
228
229 acceptPipeline_->readException(ew);
230 Acceptor::acceptStopped();
231 }
232
drainConnections(double pct)233 void drainConnections(double pct) noexcept override {
234 auto ew = folly::make_exception_wrapper<AcceptorException>(
235 AcceptorException::ExceptionType::DRAIN_CONN_PCT,
236 "draining some connections",
237 pct);
238
239 acceptPipeline_->readException(ew);
240 Acceptor::drainConnections(pct);
241 }
242
dropConnections(double pct)243 void dropConnections(double pct) noexcept override {
244 auto ew = folly::make_exception_wrapper<AcceptorException>(
245 AcceptorException::ExceptionType::DROP_CONN_PCT,
246 "dropping some connections",
247 pct);
248
249 acceptPipeline_->readException(ew);
250 Acceptor::dropConnections(pct);
251 }
252
forceStop()253 void forceStop() noexcept override {
254 auto ew = folly::make_exception_wrapper<AcceptorException>(
255 AcceptorException::ExceptionType::FORCE_STOP, "hard shutdown timeout");
256
257 acceptPipeline_->readException(ew);
258 Acceptor::forceStop();
259 }
260
261 // UDP thunk
onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,const folly::SocketAddress & addr,std::unique_ptr<folly::IOBuf> buf,bool,OnDataAvailableParams)262 void onDataAvailable(
263 std::shared_ptr<folly::AsyncUDPSocket> socket,
264 const folly::SocketAddress& addr,
265 std::unique_ptr<folly::IOBuf> buf,
266 bool /* truncated */,
267 OnDataAvailableParams /* params */) noexcept override {
268 acceptPipeline_->read(
269 AcceptPipelineType(make_tuple(buf.release(), socket, addr)));
270 }
271
onConnectionAdded(const ManagedConnection *)272 void onConnectionAdded(const ManagedConnection*) override {
273 acceptPipeline_->read(ConnEvent::CONN_ADDED);
274 }
275
onConnectionRemoved(const ManagedConnection *)276 void onConnectionRemoved(const ManagedConnection*) override {
277 acceptPipeline_->read(ConnEvent::CONN_REMOVED);
278 }
279
sslConnectionError(const folly::exception_wrapper & ex)280 void sslConnectionError(const folly::exception_wrapper& ex) override {
281 acceptPipeline_->readException(ex);
282 Acceptor::sslConnectionError(ex);
283 }
284
285 protected:
286 std::shared_ptr<AcceptPipeline> acceptPipeline_;
287 bool enableNotifyPendingShutdown_{false};
288
289 private:
290 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
291 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
292 };
293
294 template <typename Pipeline>
295 class ServerAcceptorFactory : public AcceptorFactory {
296 public:
ServerAcceptorFactory(std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,const ServerSocketConfig & accConfig)297 explicit ServerAcceptorFactory(
298 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory,
299 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory,
300 const ServerSocketConfig& accConfig)
301 : acceptPipelineFactory_(acceptPipelineFactory),
302 childPipelineFactory_(childPipelineFactory),
303 accConfig_(accConfig) {}
304
enableSharedSSLContext(bool enable)305 virtual void enableSharedSSLContext(bool enable) {
306 if (enable) {
307 sharedSSLContextManager_ =
308 std::make_shared<SharedSSLContextManagerImpl<FizzConfigUtil>>(
309 accConfig_);
310 }
311 }
312
newAcceptor(folly::EventBase * base)313 std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) override {
314 auto acceptor = std::make_shared<ServerAcceptor<Pipeline>>(
315 acceptPipelineFactory_, childPipelineFactory_, accConfig_);
316
317 if (sharedSSLContextManager_) {
318 acceptor->setFizzCertManager(sharedSSLContextManager_->getCertManager());
319 acceptor->setSSLContextManager(
320 sharedSSLContextManager_->getContextManager());
321 acceptor->init(
322 nullptr, base, nullptr, sharedSSLContextManager_->getFizzContext());
323 sharedSSLContextManager_->addAcceptor(acceptor);
324 } else {
325 acceptor->init(nullptr, base, nullptr);
326 }
327 return acceptor;
328 }
329
getSharedSSLContextManager()330 std::shared_ptr<SharedSSLContextManager> getSharedSSLContextManager() const {
331 return sharedSSLContextManager_;
332 }
333
334 private:
335 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_;
336 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
337 ServerSocketConfig accConfig_;
338 std::shared_ptr<SharedSSLContextManager> sharedSSLContextManager_;
339 };
340
341 class ServerWorkerPool : public folly::ThreadPoolExecutor::Observer {
342 public:
ServerWorkerPool(std::shared_ptr<AcceptorFactory> acceptorFactory,folly::IOThreadPoolExecutor * exec,std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,std::shared_ptr<ServerSocketFactory> socketFactory)343 explicit ServerWorkerPool(
344 std::shared_ptr<AcceptorFactory> acceptorFactory,
345 folly::IOThreadPoolExecutor* exec,
346 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
347 sockets,
348 std::shared_ptr<ServerSocketFactory> socketFactory)
349 : workers_(std::make_shared<WorkerMap>()),
350 workersMutex_(std::make_shared<Mutex>()),
351 acceptorFactory_(acceptorFactory),
352 exec_(exec),
353 sockets_(sockets),
354 socketFactory_(socketFactory) {
355 CHECK(exec);
356 }
357
358 template <typename F>
359 void forEachWorker(F&& f) const;
360
361 void threadStarted(folly::ThreadPoolExecutor::ThreadHandle*) override;
362 void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override;
threadPreviouslyStarted(folly::ThreadPoolExecutor::ThreadHandle * thread)363 void threadPreviouslyStarted(
364 folly::ThreadPoolExecutor::ThreadHandle* thread) override {
365 threadStarted(thread);
366 }
threadNotYetStopped(folly::ThreadPoolExecutor::ThreadHandle * thread)367 void threadNotYetStopped(
368 folly::ThreadPoolExecutor::ThreadHandle* thread) override {
369 threadStopped(thread);
370 }
371
372 private:
373 using WorkerMap = std::
374 map<folly::ThreadPoolExecutor::ThreadHandle*, std::shared_ptr<Acceptor>>;
375 using Mutex = folly::SharedMutexReadPriority;
376
377 std::shared_ptr<WorkerMap> workers_;
378 std::shared_ptr<Mutex> workersMutex_;
379 std::shared_ptr<AcceptorFactory> acceptorFactory_;
380 folly::IOThreadPoolExecutor* exec_{nullptr};
381 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
382 sockets_;
383 std::shared_ptr<ServerSocketFactory> socketFactory_;
384 };
385
386 template <typename F>
forEachWorker(F && f)387 void ServerWorkerPool::forEachWorker(F&& f) const {
388 Mutex::ReadHolder holder(workersMutex_.get());
389 for (const auto& kv : *workers_) {
390 f(kv.second.get());
391 }
392 }
393
394 class DefaultAcceptPipelineFactory : public AcceptPipelineFactory {
395 public:
newPipeline(Acceptor *)396 typename AcceptPipeline::Ptr newPipeline(Acceptor*) override {
397 return AcceptPipeline::create();
398 }
399 };
400
401 } // namespace wangle
402