1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/synchronization/Baton.h>
20 #include <wangle/bootstrap/ServerBootstrap-inl.h>
21 #include <wangle/channel/Pipeline.h>
22 #include <iostream>
23 #include <thread>
24 
25 namespace wangle {
26 
27 /*
28  * ServerBootstrap is a parent class intended to set up a
29  * high-performance TCP accepting server.  It will manage a pool of
30  * accepting threads, any number of accepting sockets, a pool of
31  * IO-worker threads, and connection pool for each IO thread for you.
32  *
33  * The output is given as a Pipeline template: given a
34  * PipelineFactory, it will create a new pipeline for each connection,
35  * and your server can handle the incoming bytes.
36  *
37  * BACKWARDS COMPATIBLITY: for servers already taking a pool of
38  * Acceptor objects, an AcceptorFactory can be given directly instead
39  * of a pipeline factory.
40  */
41 template <typename Pipeline = wangle::DefaultPipeline>
42 class ServerBootstrap {
43  public:
44   ServerBootstrap(const ServerBootstrap& that) = delete;
45   ServerBootstrap(ServerBootstrap&& that) = default;
46 
47   ServerBootstrap() = default;
48 
~ServerBootstrap()49   ~ServerBootstrap() {
50     stop();
51     join();
52   }
53 
54   /*
55    * Pipeline used to add connections to event bases.
56    * This is used for UDP or for load balancing
57    * TCP connections to IO threads explicitly
58    */
pipeline(std::shared_ptr<AcceptPipelineFactory> factory)59   ServerBootstrap* pipeline(std::shared_ptr<AcceptPipelineFactory> factory) {
60     acceptPipelineFactory_ = factory;
61     return this;
62   }
63 
channelFactory(std::shared_ptr<ServerSocketFactory> factory)64   ServerBootstrap* channelFactory(
65       std::shared_ptr<ServerSocketFactory> factory) {
66     socketFactory_ = factory;
67     return this;
68   }
69 
acceptorConfig(const ServerSocketConfig & accConfig)70   ServerBootstrap* acceptorConfig(const ServerSocketConfig& accConfig) {
71     accConfig_ = accConfig;
72     return this;
73   }
74 
75   /*
76    * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
77    * Acceptor is responsible for managing the connection pool.
78    *
79    * @param childHandler - acceptor factory to call for each IO thread
80    */
childHandler(std::shared_ptr<AcceptorFactory> h)81   ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
82     acceptorFactory_ = h;
83     return this;
84   }
85 
86   /*
87    * Set a pipeline factory that will be called for each new connection
88    *
89    * @param factory pipeline factory to use for each new connection
90    */
childPipeline(std::shared_ptr<PipelineFactory<Pipeline>> factory)91   ServerBootstrap* childPipeline(
92       std::shared_ptr<PipelineFactory<Pipeline>> factory) {
93     childPipelineFactory_ = factory;
94     return this;
95   }
96 
97   /*
98    * Set the IO executor.  If not set, a default one will be created
99    * with one thread per core.
100    *
101    * @param io_group - io executor to use for IO threads.
102    */
group(std::shared_ptr<folly::IOThreadPoolExecutor> io_group)103   ServerBootstrap* group(
104       std::shared_ptr<folly::IOThreadPoolExecutor> io_group) {
105     return group(nullptr, io_group);
106   }
107 
108   /*
109    * Set the acceptor executor, and IO executor.
110    *
111    * If no acceptor executor is set, a single thread will be created for accepts
112    * If no IO executor is set, a default of one thread per core will be created
113    *
114    * @param group - acceptor executor to use for acceptor threads.
115    * @param io_group - io executor to use for IO threads.
116    */
group(std::shared_ptr<folly::IOThreadPoolExecutor> accept_group,std::shared_ptr<folly::IOThreadPoolExecutor> io_group)117   ServerBootstrap* group(
118       std::shared_ptr<folly::IOThreadPoolExecutor> accept_group,
119       std::shared_ptr<folly::IOThreadPoolExecutor> io_group) {
120     if (!accept_group) {
121       accept_group = std::make_shared<folly::IOThreadPoolExecutor>(
122           1, std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
123     }
124     if (!io_group) {
125       auto threads = std::thread::hardware_concurrency();
126       if (threads <= 0) {
127         // Reasonable mid-point for concurrency when actual value unknown
128         threads = 8;
129       }
130       io_group = std::make_shared<folly::IOThreadPoolExecutor>(
131           threads, std::make_shared<folly::NamedThreadFactory>("IO Thread"));
132     }
133 
134     // TODO better config checking
135     // CHECK(acceptorFactory_ || childPipelineFactory_);
136     CHECK(!(acceptorFactory_ && childPipelineFactory_));
137 
138     if (acceptorFactory_) {
139       workerFactory_ = std::make_shared<ServerWorkerPool>(
140           acceptorFactory_, io_group.get(), sockets_, socketFactory_);
141     } else {
142       auto acceptorFactory = std::make_shared<ServerAcceptorFactory<Pipeline>>(
143           acceptPipelineFactory_, childPipelineFactory_, accConfig_);
144       acceptorFactory->enableSharedSSLContext(useSharedSSLContextManager_);
145       sharedSSLContextManager_ = acceptorFactory->getSharedSSLContextManager();
146       workerFactory_ = std::make_shared<ServerWorkerPool>(
147           acceptorFactory, io_group.get(), sockets_, socketFactory_);
148     }
149 
150     io_group->addObserver(workerFactory_);
151 
152     acceptor_group_ = accept_group;
153     io_group_ = io_group;
154 
155     return this;
156   }
157 
158   /*
159    * Bind to an existing socket
160    *
161    * @param sock Existing socket to use for accepting
162    */
bind(folly::AsyncServerSocket::UniquePtr s)163   void bind(folly::AsyncServerSocket::UniquePtr s) {
164     if (!workerFactory_) {
165       group(nullptr);
166     }
167 
168     // Since only a single socket is given,
169     // we can only accept on a single thread
170     CHECK(acceptor_group_->numThreads() == 1);
171 
172     std::shared_ptr<folly::AsyncServerSocket> socket(
173         s.release(), AsyncServerSocketFactory::ThreadSafeDestructor());
174     socket->setMaxNumMessagesInQueue(
175         accConfig_.maxNumPendingConnectionsPerWorker);
176 
177     folly::via(acceptor_group_.get(), [&] {
178       if (useZeroCopy_) {
179         socket->setZeroCopy(true);
180       }
181       socket->attachEventBase(folly::EventBaseManager::get()->getEventBase());
182       socket->listen(socketConfig.acceptBacklog);
183       socket->startAccepting();
184     }).get();
185 
186     // Startup all the threads
187     workerFactory_->forEachWorker([this, socket](Acceptor* worker) {
188       socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
189           [this, worker, socket]() {
190             socketFactory_->addAcceptCB(socket, worker, worker->getEventBase());
191           });
192     });
193 
194     sockets_->push_back(socket);
195   }
196 
bind(folly::SocketAddress & address)197   void bind(folly::SocketAddress& address) {
198     bindImpl(address);
199   }
200 
201   /*
202    * Bind to a port and start listening.
203    * One of childPipeline or childHandler must be called before bind
204    *
205    * @param port Port to listen on
206    */
bind(int port)207   void bind(int port) {
208     CHECK(port >= 0);
209     folly::SocketAddress address;
210     address.setFromLocalPort(port);
211     bindImpl(address);
212   }
213 
bindImpl(folly::SocketAddress & address)214   void bindImpl(folly::SocketAddress& address) {
215     if (!workerFactory_) {
216       group(nullptr);
217     }
218 
219     bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1) ||
220         accConfig_.reusePort;
221 
222     std::mutex sock_lock;
223     std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets;
224 
225     std::exception_ptr exn;
226 
227     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier) {
228       try {
229         auto socket = socketFactory_->newSocket(
230             address, socketConfig.acceptBacklog, reusePort, socketConfig);
231         sock_lock.lock();
232         new_sockets.push_back(socket);
233         sock_lock.unlock();
234         socket->getAddress(&address);
235 
236         barrier->post();
237       } catch (...) {
238         exn = std::current_exception();
239         barrier->post();
240 
241         return;
242       }
243     };
244 
245     auto wait0 = std::make_shared<folly::Baton<>>();
246     acceptor_group_->add(std::bind(startupFunc, wait0));
247     wait0->wait();
248 
249     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
250       auto barrier = std::make_shared<folly::Baton<>>();
251       acceptor_group_->add(std::bind(startupFunc, barrier));
252       barrier->wait();
253     }
254 
255     if (exn) {
256       std::rethrow_exception(exn);
257     }
258 
259     for (auto& socket : new_sockets) {
260       // Startup all the threads
261       workerFactory_->forEachWorker([this, socket](Acceptor* worker) {
262         socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
263             [this, worker, socket]() {
264               socketFactory_->addAcceptCB(
265                   socket, worker, worker->getEventBase());
266             });
267       });
268 
269       sockets_->push_back(socket);
270     }
271   }
272 
273   /*
274    * Stop listening on all sockets.
275    */
stop()276   void stop() {
277     // sockets_ may be null if ServerBootstrap has been std::move'd
278     if (sockets_) {
279       sockets_->clear();
280     }
281     if (!stopped_) {
282       stopped_ = true;
283       // stopBaton_ may be null if ServerBootstrap has been std::move'd
284       if (stopBaton_) {
285         stopBaton_->post();
286       }
287     }
288   }
289 
join()290   void join() {
291     if (acceptor_group_) {
292       acceptor_group_->join();
293     }
294     if (io_group_) {
295       io_group_->join();
296     }
297   }
298 
waitForStop()299   void waitForStop() {
300     if (stopBaton_) {
301       stopBaton_->wait();
302       stopBaton_.reset();
303     }
304     CHECK(stopped_);
305   }
306 
307   /*
308    * Get the list of listening sockets
309    */
getSockets()310   const std::vector<std::shared_ptr<folly::AsyncSocketBase>>& getSockets()
311       const {
312     return *sockets_;
313   }
314 
getSharedSSLContextManager()315   std::shared_ptr<SharedSSLContextManager> getSharedSSLContextManager() const {
316     return sharedSSLContextManager_;
317   }
318 
getIOGroup()319   std::shared_ptr<folly::IOThreadPoolExecutor> getIOGroup() const {
320     return io_group_;
321   }
322 
323   template <typename F>
forEachWorker(F && f)324   void forEachWorker(F&& f) const {
325     if (!workerFactory_) {
326       return;
327     }
328     workerFactory_->forEachWorker(f);
329   }
330 
331   ServerSocketConfig socketConfig;
332 
setReusePort(bool reusePort)333   ServerBootstrap* setReusePort(bool reusePort) {
334     reusePort_ = reusePort;
335     return this;
336   }
337 
setUseSharedSSLContextManager(bool enabled)338   ServerBootstrap* setUseSharedSSLContextManager(bool enabled) {
339     useSharedSSLContextManager_ = enabled;
340     return this;
341   }
342 
343  private:
344   std::shared_ptr<folly::IOThreadPoolExecutor> acceptor_group_;
345   std::shared_ptr<folly::IOThreadPoolExecutor> io_group_;
346   std::shared_ptr<SharedSSLContextManager> sharedSSLContextManager_;
347 
348   std::shared_ptr<ServerWorkerPool> workerFactory_;
349   std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>>
350       sockets_{std::make_shared<
351           std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()};
352 
353   std::shared_ptr<AcceptorFactory> acceptorFactory_;
354   std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
355   std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_{
356       std::make_shared<DefaultAcceptPipelineFactory>()};
357   std::shared_ptr<ServerSocketFactory> socketFactory_{
358       std::make_shared<AsyncServerSocketFactory>()};
359 
360   ServerSocketConfig accConfig_;
361 
362   bool reusePort_{false};
363 
364   std::unique_ptr<folly::Baton<>> stopBaton_{
365       std::make_unique<folly::Baton<>>()};
366   bool stopped_{false};
367   bool useZeroCopy_{false};
368   bool useSharedSSLContextManager_{false};
369 };
370 
371 } // namespace wangle
372