1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/synchronization/Baton.h> 20 #include <wangle/bootstrap/ServerBootstrap-inl.h> 21 #include <wangle/channel/Pipeline.h> 22 #include <iostream> 23 #include <thread> 24 25 namespace wangle { 26 27 /* 28 * ServerBootstrap is a parent class intended to set up a 29 * high-performance TCP accepting server. It will manage a pool of 30 * accepting threads, any number of accepting sockets, a pool of 31 * IO-worker threads, and connection pool for each IO thread for you. 32 * 33 * The output is given as a Pipeline template: given a 34 * PipelineFactory, it will create a new pipeline for each connection, 35 * and your server can handle the incoming bytes. 36 * 37 * BACKWARDS COMPATIBLITY: for servers already taking a pool of 38 * Acceptor objects, an AcceptorFactory can be given directly instead 39 * of a pipeline factory. 40 */ 41 template <typename Pipeline = wangle::DefaultPipeline> 42 class ServerBootstrap { 43 public: 44 ServerBootstrap(const ServerBootstrap& that) = delete; 45 ServerBootstrap(ServerBootstrap&& that) = default; 46 47 ServerBootstrap() = default; 48 ~ServerBootstrap()49 ~ServerBootstrap() { 50 stop(); 51 join(); 52 } 53 54 /* 55 * Pipeline used to add connections to event bases. 56 * This is used for UDP or for load balancing 57 * TCP connections to IO threads explicitly 58 */ pipeline(std::shared_ptr<AcceptPipelineFactory> factory)59 ServerBootstrap* pipeline(std::shared_ptr<AcceptPipelineFactory> factory) { 60 acceptPipelineFactory_ = factory; 61 return this; 62 } 63 channelFactory(std::shared_ptr<ServerSocketFactory> factory)64 ServerBootstrap* channelFactory( 65 std::shared_ptr<ServerSocketFactory> factory) { 66 socketFactory_ = factory; 67 return this; 68 } 69 acceptorConfig(const ServerSocketConfig & accConfig)70 ServerBootstrap* acceptorConfig(const ServerSocketConfig& accConfig) { 71 accConfig_ = accConfig; 72 return this; 73 } 74 75 /* 76 * BACKWARDS COMPATIBILITY - an acceptor factory can be set. Your 77 * Acceptor is responsible for managing the connection pool. 78 * 79 * @param childHandler - acceptor factory to call for each IO thread 80 */ childHandler(std::shared_ptr<AcceptorFactory> h)81 ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) { 82 acceptorFactory_ = h; 83 return this; 84 } 85 86 /* 87 * Set a pipeline factory that will be called for each new connection 88 * 89 * @param factory pipeline factory to use for each new connection 90 */ childPipeline(std::shared_ptr<PipelineFactory<Pipeline>> factory)91 ServerBootstrap* childPipeline( 92 std::shared_ptr<PipelineFactory<Pipeline>> factory) { 93 childPipelineFactory_ = factory; 94 return this; 95 } 96 97 /* 98 * Set the IO executor. If not set, a default one will be created 99 * with one thread per core. 100 * 101 * @param io_group - io executor to use for IO threads. 102 */ group(std::shared_ptr<folly::IOThreadPoolExecutor> io_group)103 ServerBootstrap* group( 104 std::shared_ptr<folly::IOThreadPoolExecutor> io_group) { 105 return group(nullptr, io_group); 106 } 107 108 /* 109 * Set the acceptor executor, and IO executor. 110 * 111 * If no acceptor executor is set, a single thread will be created for accepts 112 * If no IO executor is set, a default of one thread per core will be created 113 * 114 * @param group - acceptor executor to use for acceptor threads. 115 * @param io_group - io executor to use for IO threads. 116 */ group(std::shared_ptr<folly::IOThreadPoolExecutor> accept_group,std::shared_ptr<folly::IOThreadPoolExecutor> io_group)117 ServerBootstrap* group( 118 std::shared_ptr<folly::IOThreadPoolExecutor> accept_group, 119 std::shared_ptr<folly::IOThreadPoolExecutor> io_group) { 120 if (!accept_group) { 121 accept_group = std::make_shared<folly::IOThreadPoolExecutor>( 122 1, std::make_shared<folly::NamedThreadFactory>("Acceptor Thread")); 123 } 124 if (!io_group) { 125 auto threads = std::thread::hardware_concurrency(); 126 if (threads <= 0) { 127 // Reasonable mid-point for concurrency when actual value unknown 128 threads = 8; 129 } 130 io_group = std::make_shared<folly::IOThreadPoolExecutor>( 131 threads, std::make_shared<folly::NamedThreadFactory>("IO Thread")); 132 } 133 134 // TODO better config checking 135 // CHECK(acceptorFactory_ || childPipelineFactory_); 136 CHECK(!(acceptorFactory_ && childPipelineFactory_)); 137 138 if (acceptorFactory_) { 139 workerFactory_ = std::make_shared<ServerWorkerPool>( 140 acceptorFactory_, io_group.get(), sockets_, socketFactory_); 141 } else { 142 auto acceptorFactory = std::make_shared<ServerAcceptorFactory<Pipeline>>( 143 acceptPipelineFactory_, childPipelineFactory_, accConfig_); 144 acceptorFactory->enableSharedSSLContext(useSharedSSLContextManager_); 145 sharedSSLContextManager_ = acceptorFactory->getSharedSSLContextManager(); 146 workerFactory_ = std::make_shared<ServerWorkerPool>( 147 acceptorFactory, io_group.get(), sockets_, socketFactory_); 148 } 149 150 io_group->addObserver(workerFactory_); 151 152 acceptor_group_ = accept_group; 153 io_group_ = io_group; 154 155 return this; 156 } 157 158 /* 159 * Bind to an existing socket 160 * 161 * @param sock Existing socket to use for accepting 162 */ bind(folly::AsyncServerSocket::UniquePtr s)163 void bind(folly::AsyncServerSocket::UniquePtr s) { 164 if (!workerFactory_) { 165 group(nullptr); 166 } 167 168 // Since only a single socket is given, 169 // we can only accept on a single thread 170 CHECK(acceptor_group_->numThreads() == 1); 171 172 std::shared_ptr<folly::AsyncServerSocket> socket( 173 s.release(), AsyncServerSocketFactory::ThreadSafeDestructor()); 174 socket->setMaxNumMessagesInQueue( 175 accConfig_.maxNumPendingConnectionsPerWorker); 176 177 folly::via(acceptor_group_.get(), [&] { 178 if (useZeroCopy_) { 179 socket->setZeroCopy(true); 180 } 181 socket->attachEventBase(folly::EventBaseManager::get()->getEventBase()); 182 socket->listen(socketConfig.acceptBacklog); 183 socket->startAccepting(); 184 }).get(); 185 186 // Startup all the threads 187 workerFactory_->forEachWorker([this, socket](Acceptor* worker) { 188 socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait( 189 [this, worker, socket]() { 190 socketFactory_->addAcceptCB(socket, worker, worker->getEventBase()); 191 }); 192 }); 193 194 sockets_->push_back(socket); 195 } 196 bind(folly::SocketAddress & address)197 void bind(folly::SocketAddress& address) { 198 bindImpl(address); 199 } 200 201 /* 202 * Bind to a port and start listening. 203 * One of childPipeline or childHandler must be called before bind 204 * 205 * @param port Port to listen on 206 */ bind(int port)207 void bind(int port) { 208 CHECK(port >= 0); 209 folly::SocketAddress address; 210 address.setFromLocalPort(port); 211 bindImpl(address); 212 } 213 bindImpl(folly::SocketAddress & address)214 void bindImpl(folly::SocketAddress& address) { 215 if (!workerFactory_) { 216 group(nullptr); 217 } 218 219 bool reusePort = reusePort_ || (acceptor_group_->numThreads() > 1) || 220 accConfig_.reusePort; 221 222 std::mutex sock_lock; 223 std::vector<std::shared_ptr<folly::AsyncSocketBase>> new_sockets; 224 225 std::exception_ptr exn; 226 227 auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier) { 228 try { 229 auto socket = socketFactory_->newSocket( 230 address, socketConfig.acceptBacklog, reusePort, socketConfig); 231 sock_lock.lock(); 232 new_sockets.push_back(socket); 233 sock_lock.unlock(); 234 socket->getAddress(&address); 235 236 barrier->post(); 237 } catch (...) { 238 exn = std::current_exception(); 239 barrier->post(); 240 241 return; 242 } 243 }; 244 245 auto wait0 = std::make_shared<folly::Baton<>>(); 246 acceptor_group_->add(std::bind(startupFunc, wait0)); 247 wait0->wait(); 248 249 for (size_t i = 1; i < acceptor_group_->numThreads(); i++) { 250 auto barrier = std::make_shared<folly::Baton<>>(); 251 acceptor_group_->add(std::bind(startupFunc, barrier)); 252 barrier->wait(); 253 } 254 255 if (exn) { 256 std::rethrow_exception(exn); 257 } 258 259 for (auto& socket : new_sockets) { 260 // Startup all the threads 261 workerFactory_->forEachWorker([this, socket](Acceptor* worker) { 262 socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait( 263 [this, worker, socket]() { 264 socketFactory_->addAcceptCB( 265 socket, worker, worker->getEventBase()); 266 }); 267 }); 268 269 sockets_->push_back(socket); 270 } 271 } 272 273 /* 274 * Stop listening on all sockets. 275 */ stop()276 void stop() { 277 // sockets_ may be null if ServerBootstrap has been std::move'd 278 if (sockets_) { 279 sockets_->clear(); 280 } 281 if (!stopped_) { 282 stopped_ = true; 283 // stopBaton_ may be null if ServerBootstrap has been std::move'd 284 if (stopBaton_) { 285 stopBaton_->post(); 286 } 287 } 288 } 289 join()290 void join() { 291 if (acceptor_group_) { 292 acceptor_group_->join(); 293 } 294 if (io_group_) { 295 io_group_->join(); 296 } 297 } 298 waitForStop()299 void waitForStop() { 300 if (stopBaton_) { 301 stopBaton_->wait(); 302 stopBaton_.reset(); 303 } 304 CHECK(stopped_); 305 } 306 307 /* 308 * Get the list of listening sockets 309 */ getSockets()310 const std::vector<std::shared_ptr<folly::AsyncSocketBase>>& getSockets() 311 const { 312 return *sockets_; 313 } 314 getSharedSSLContextManager()315 std::shared_ptr<SharedSSLContextManager> getSharedSSLContextManager() const { 316 return sharedSSLContextManager_; 317 } 318 getIOGroup()319 std::shared_ptr<folly::IOThreadPoolExecutor> getIOGroup() const { 320 return io_group_; 321 } 322 323 template <typename F> forEachWorker(F && f)324 void forEachWorker(F&& f) const { 325 if (!workerFactory_) { 326 return; 327 } 328 workerFactory_->forEachWorker(f); 329 } 330 331 ServerSocketConfig socketConfig; 332 setReusePort(bool reusePort)333 ServerBootstrap* setReusePort(bool reusePort) { 334 reusePort_ = reusePort; 335 return this; 336 } 337 setUseSharedSSLContextManager(bool enabled)338 ServerBootstrap* setUseSharedSSLContextManager(bool enabled) { 339 useSharedSSLContextManager_ = enabled; 340 return this; 341 } 342 343 private: 344 std::shared_ptr<folly::IOThreadPoolExecutor> acceptor_group_; 345 std::shared_ptr<folly::IOThreadPoolExecutor> io_group_; 346 std::shared_ptr<SharedSSLContextManager> sharedSSLContextManager_; 347 348 std::shared_ptr<ServerWorkerPool> workerFactory_; 349 std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> 350 sockets_{std::make_shared< 351 std::vector<std::shared_ptr<folly::AsyncSocketBase>>>()}; 352 353 std::shared_ptr<AcceptorFactory> acceptorFactory_; 354 std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_; 355 std::shared_ptr<AcceptPipelineFactory> acceptPipelineFactory_{ 356 std::make_shared<DefaultAcceptPipelineFactory>()}; 357 std::shared_ptr<ServerSocketFactory> socketFactory_{ 358 std::make_shared<AsyncServerSocketFactory>()}; 359 360 ServerSocketConfig accConfig_; 361 362 bool reusePort_{false}; 363 364 std::unique_ptr<folly::Baton<>> stopBaton_{ 365 std::make_unique<folly::Baton<>>()}; 366 bool stopped_{false}; 367 bool useZeroCopy_{false}; 368 bool useSharedSSLContextManager_{false}; 369 }; 370 371 } // namespace wangle 372