1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 *
7 */
8
9 #include <quic/server/QuicServer.h>
10
11 #include <folly/Random.h>
12 #include <folly/io/async/EventBaseManager.h>
13 #include <quic/codec/DefaultConnectionIdAlgo.h>
14 #include <quic/codec/QuicHeaderCodec.h>
15 #include <iterator>
16 #ifdef CCP_ENABLED
17 #include <quic/congestion_control/third_party/ccp/libstartccp.h>
18 #endif
19 #include <quic/server/CCPReader.h>
20 #include <quic/server/QuicReusePortUDPSocketFactory.h>
21 #include <quic/server/QuicServerTransport.h>
22 #include <quic/server/QuicSharedUDPSocketFactory.h>
23 #include <quic/server/SlidingWindowRateLimiter.h>
24
25 DEFINE_bool(
26 qs_io_uring_use_async_recv,
27 true,
28 "io_uring backend use async recv");
29
30 namespace quic {
31 namespace {
32 // Determine which worker to route to
33 // This **MUST** be kept in sync with the BPF program (if supplied)
getWorkerToRouteTo(const RoutingData & routingData,size_t numWorkers,ConnectionIdAlgo * connIdAlgo)34 size_t getWorkerToRouteTo(
35 const RoutingData& routingData,
36 size_t numWorkers,
37 ConnectionIdAlgo* connIdAlgo) {
38 return connIdAlgo->parseConnectionId(routingData.destinationConnId)
39 ->workerId %
40 numWorkers;
41 }
42 } // namespace
43
QuicServer()44 QuicServer::QuicServer() {
45 listenerSocketFactory_ = std::make_unique<QuicReusePortUDPSocketFactory>();
46 socketFactory_ = std::make_unique<QuicSharedUDPSocketFactory>();
47 }
48
setQuicServerTransportFactory(std::unique_ptr<QuicServerTransportFactory> factory)49 void QuicServer::setQuicServerTransportFactory(
50 std::unique_ptr<QuicServerTransportFactory> factory) {
51 transportFactory_ = std::move(factory);
52 }
53
setQuicUDPSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory)54 void QuicServer::setQuicUDPSocketFactory(
55 std::unique_ptr<QuicUDPSocketFactory> factory) {
56 socketFactory_ = std::move(factory);
57 }
58
setListenerSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory)59 void QuicServer::setListenerSocketFactory(
60 std::unique_ptr<QuicUDPSocketFactory> factory) {
61 listenerSocketFactory_ = std::move(factory);
62 }
63
setCongestionControllerFactory(std::shared_ptr<CongestionControllerFactory> ccFactory)64 void QuicServer::setCongestionControllerFactory(
65 std::shared_ptr<CongestionControllerFactory> ccFactory) {
66 CHECK(!initialized_)
67 << " Congestion Control Factorty must be set before the server is "
68 << "initialized.";
69 CHECK(ccFactory);
70 ccFactory_ = std::move(ccFactory);
71 }
72
setRateLimit(std::function<uint64_t ()> count,std::chrono::seconds window)73 void QuicServer::setRateLimit(
74 std::function<uint64_t()> count,
75 std::chrono::seconds window) {
76 rateLimit_ = folly::make_optional<RateLimit>(std::move(count), window);
77 }
78
setSupportedVersion(const std::vector<QuicVersion> & versions)79 void QuicServer::setSupportedVersion(const std::vector<QuicVersion>& versions) {
80 supportedVersions_ = versions;
81 }
82
setProcessId(ProcessId id)83 void QuicServer::setProcessId(ProcessId id) noexcept {
84 processId_ = id;
85 }
86
getProcessId() const87 ProcessId QuicServer::getProcessId() const noexcept {
88 return processId_;
89 }
90
isInitialized() const91 bool QuicServer::isInitialized() const noexcept {
92 return initialized_;
93 }
94
start(const folly::SocketAddress & address,size_t maxWorkers)95 void QuicServer::start(const folly::SocketAddress& address, size_t maxWorkers) {
96 CHECK(ctx_) << "Must set a TLS context for the Quic server";
97 CHECK_LE(maxWorkers, std::numeric_limits<uint8_t>::max());
98 size_t numCpu = std::thread::hardware_concurrency();
99 if (maxWorkers == 0) {
100 maxWorkers = numCpu;
101 }
102 auto numWorkers = std::min(numCpu, maxWorkers);
103 std::vector<folly::EventBase*> evbs;
104 for (size_t i = 0; i < numWorkers; ++i) {
105 auto scopedEvb = std::make_unique<folly::ScopedEventBaseThread>(
106 folly::EventBase::Options().setBackendFactory(
107 [] { return getEventBaseBackend(); }),
108 nullptr,
109 "");
110 workerEvbs_.push_back(std::move(scopedEvb));
111 if (evbObserver_) {
112 workerEvbs_.back()->getEventBase()->runInEventBaseThreadAndWait([&] {
113 workerEvbs_.back()->getEventBase()->setObserver(evbObserver_);
114 });
115 }
116 auto workerEvb = workerEvbs_.back()->getEventBase();
117 evbs.push_back(workerEvb);
118 }
119 initialize(address, evbs, true /* useDefaultTransport */);
120 start();
121 }
122
initialize(const folly::SocketAddress & address,const std::vector<folly::EventBase * > & evbs,bool useDefaultTransport)123 void QuicServer::initialize(
124 const folly::SocketAddress& address,
125 const std::vector<folly::EventBase*>& evbs,
126 bool useDefaultTransport) {
127 CHECK(!evbs.empty());
128 CHECK_LE(evbs.size(), std::numeric_limits<uint8_t>::max())
129 << "Quic Server does not support more than "
130 << std::numeric_limits<uint8_t>::max() << " workers";
131 CHECK(shutdown_);
132 shutdown_ = false;
133
134 // setting default stateless reset token if not set
135 if (!transportSettings_.statelessResetTokenSecret) {
136 std::array<uint8_t, kStatelessResetTokenSecretLength> secret;
137 folly::Random::secureRandom(secret.data(), secret.size());
138 transportSettings_.statelessResetTokenSecret = secret;
139 }
140
141 // it the connid algo factory is not set, use default impl
142 if (!connIdAlgoFactory_) {
143 connIdAlgoFactory_ = std::make_unique<DefaultConnectionIdAlgoFactory>();
144 connIdAlgo_ = connIdAlgoFactory_->make();
145 } else {
146 connIdAlgo_ = connIdAlgoFactory_->make();
147 }
148 if (!ccFactory_) {
149 ccFactory_ = std::make_shared<ServerCongestionControllerFactory>();
150 }
151
152 initializeWorkers(evbs, useDefaultTransport);
153 bindWorkersToSocket(address, evbs);
154 }
155
initializeWorkers(const std::vector<folly::EventBase * > & evbs,bool useDefaultTransport)156 void QuicServer::initializeWorkers(
157 const std::vector<folly::EventBase*>& evbs,
158 bool useDefaultTransport) {
159 CHECK(workers_.empty());
160 // iterate in the order of insertion in vector
161 for (size_t i = 0; i < evbs.size(); ++i) {
162 auto workerEvb = evbs[i];
163 auto worker = newWorkerWithoutSocket();
164 if (useDefaultTransport) {
165 CHECK(transportFactory_) << "Transport factory is not set";
166 worker->setTransportFactory(transportFactory_.get());
167 worker->setFizzContext(ctx_);
168 }
169 if (healthCheckToken_) {
170 worker->setHealthCheckToken(*healthCheckToken_);
171 }
172 if (transportStatsFactory_) {
173 workerEvb->runInEventBaseThread(
174 [self = this->shared_from_this(),
175 workerPtr = worker.get(),
176 transportStatsFactory = transportStatsFactory_.get()] {
177 if (self->shutdown_) {
178 return;
179 }
180 auto statsCallback = transportStatsFactory->make();
181 CHECK(statsCallback);
182 workerPtr->setTransportStatsCallback(std::move(statsCallback));
183 });
184 }
185 worker->setConnectionIdAlgo(connIdAlgoFactory_->make());
186 worker->setCongestionControllerFactory(ccFactory_);
187 if (rateLimit_) {
188 worker->setRateLimiter(std::make_unique<SlidingWindowRateLimiter>(
189 rateLimit_->count, rateLimit_->window));
190 }
191 worker->setWorkerId(i);
192 worker->setTransportSettingsOverrideFn(transportSettingsOverrideFn_);
193 workers_.push_back(std::move(worker));
194 evbToWorkers_.emplace(workerEvb, workers_.back().get());
195 }
196 }
197
newWorkerWithoutSocket()198 std::unique_ptr<QuicServerWorker> QuicServer::newWorkerWithoutSocket() {
199 auto worker = std::make_unique<QuicServerWorker>(
200 this->shared_from_this(), FLAGS_qs_io_uring_use_async_recv);
201 worker->setNewConnectionSocketFactory(socketFactory_.get());
202 worker->setSupportedVersions(supportedVersions_);
203 worker->setTransportSettings(transportSettings_);
204 worker->rejectNewConnections(rejectNewConnections_);
205 worker->setProcessId(processId_);
206 worker->setHostId(hostId_);
207 worker->setConnectionIdVersion(cidVersion_);
208 return worker;
209 }
210
bindWorkersToSocket(const folly::SocketAddress & address,const std::vector<folly::EventBase * > & evbs)211 void QuicServer::bindWorkersToSocket(
212 const folly::SocketAddress& address,
213 const std::vector<folly::EventBase*>& evbs) {
214 auto numWorkers = evbs.size();
215 CHECK(!initialized_);
216 boundAddress_ = address;
217 auto usingCCP = isUsingCCP();
218 if (!usingCCP) {
219 LOG(INFO) << "NOT using CCP";
220 }
221 auto ccpInitFailed = false;
222 for (size_t i = 0; i < numWorkers; ++i) {
223 auto workerEvb = evbs[i];
224 workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
225 [self = this->shared_from_this(),
226 workerEvb,
227 numWorkers,
228 usingCCP,
229 &ccpInitFailed,
230 processId = processId_,
231 idx = i] {
232 std::lock_guard<std::mutex> guard(self->startMutex_);
233 if (self->shutdown_) {
234 return;
235 }
236 auto workerSocket = self->listenerSocketFactory_->make(workerEvb, -1);
237 auto it = self->evbToWorkers_.find(workerEvb);
238 CHECK(it != self->evbToWorkers_.end());
239 auto worker = it->second;
240 int takeoverOverFd = -1;
241 if (self->listeningFDs_.size() > idx) {
242 takeoverOverFd = self->listeningFDs_[idx];
243 }
244 worker->setSocketOptions(&self->socketOptions_);
245 // dup the takenover socket on only one worker and bind the rest
246 if (takeoverOverFd >= 0) {
247 workerSocket->setFD(
248 folly::NetworkSocket::fromFd(::dup(takeoverOverFd)),
249 // set ownership to OWNS to allow ::close()'ing of of the fd
250 // when this server goes away
251 folly::AsyncUDPSocket::FDOwnership::OWNS);
252 worker->setSocket(std::move(workerSocket));
253 if (idx == 0) {
254 self->boundAddress_ = worker->getAddress();
255 }
256 VLOG(4) << "Set up dup()'ed fd for address=" << self->boundAddress_
257 << " on workerId=" << (int)worker->getWorkerId();
258 worker->applyAllSocketOptions();
259 } else {
260 VLOG(4) << "No valid takenover fd found for address="
261 << self->boundAddress_ << ". binding on worker=" << worker
262 << " workerId=" << (int)worker->getWorkerId()
263 << " processId=" << (int)processId;
264 worker->setSocket(std::move(workerSocket));
265 worker->bind(self->boundAddress_, self->bindOptions_);
266 if (idx == 0) {
267 self->boundAddress_ = worker->getAddress();
268 }
269 }
270 if (usingCCP) {
271 auto serverId = self->boundAddress_.getIPAddress().hash() |
272 self->boundAddress_.getPort();
273 try {
274 worker->getCcpReader()->try_initialize(
275 worker->getEventBase(),
276 self->ccpId_,
277 serverId,
278 worker->getWorkerId());
279 worker->getCcpReader()->connect();
280 } catch (const folly::AsyncSocketException& ex) {
281 // probably means the unix socket failed to bind
282 LOG(ERROR) << "exception while initializing ccp: " << ex.what()
283 << "\nshutting down...";
284 // TODO also update counters
285 ccpInitFailed = true;
286 } catch (const std::exception& ex) {
287 LOG(ERROR) << "exception initializing ccp: " << ex.what()
288 << "\nshutting down...";
289 ccpInitFailed = true;
290 }
291 }
292 if (idx == (numWorkers - 1)) {
293 VLOG(4) << "Initialized all workers in the eventbase";
294 self->initialized_ = true;
295 self->startCv_.notify_all();
296 }
297 });
298 }
299 if (usingCCP && ccpInitFailed) {
300 shutdown();
301 }
302 }
303
start()304 void QuicServer::start() {
305 CHECK(initialized_);
306 // initialize the thread local ptr to workers
307 runOnAllWorkers([&](auto worker) mutable {
308 // pass in no-op deleter to ThreadLocalPtr since the destruction of
309 // QuicServerWorker is managed by the QuicServer
310 workerPtr_.reset(
311 worker, [](auto /* worker */, folly::TLPDestructionMode) {});
312 });
313 auto usingCCP = isUsingCCP();
314 for (auto& worker : workers_) {
315 worker->getEventBase()->runInEventBaseThread([&worker, usingCCP] {
316 if (usingCCP) {
317 worker->getCcpReader()->start();
318 }
319 worker->start();
320 });
321 }
322 }
323
allowBeingTakenOver(const folly::SocketAddress & addr)324 void QuicServer::allowBeingTakenOver(const folly::SocketAddress& addr) {
325 // synchronously bind workers to takeover handler port.
326 // This method should not be called from a worker
327 CHECK(!workers_.empty());
328 CHECK(!shutdown_);
329
330 // this function shouldn't be called from worker's thread
331 for (auto& worker : workers_) {
332 DCHECK(
333 // if the eventbase is not running, it returns true for isInEvbThread()
334 !worker->getEventBase()->isRunning() ||
335 !worker->getEventBase()->isInEventBaseThread());
336 }
337 // TODO workers_ (vector) is not protected against concurrent modifications
338 auto numWorkers = workers_.size();
339 for (size_t i = 0; i < numWorkers; ++i) {
340 auto workerEvb = workers_[i]->getEventBase();
341 workerEvb->runInEventBaseThreadAndWait([&] {
342 std::lock_guard<std::mutex> guard(startMutex_);
343 CHECK(initialized_);
344 auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
345 auto it = evbToWorkers_.find(workerEvb);
346 CHECK(it != evbToWorkers_.end());
347 auto worker = it->second;
348 worker->allowBeingTakenOver(std::move(localListenSocket), addr);
349 });
350 }
351 VLOG(4) << "Bind all workers in the eventbase to takeover handler port";
352 takeoverHandlerInitialized_ = true;
353 }
354
overrideTakeoverHandlerAddress(const folly::SocketAddress & addr)355 folly::SocketAddress QuicServer::overrideTakeoverHandlerAddress(
356 const folly::SocketAddress& addr) {
357 // synchronously bind workers to takeover handler port.
358 // This method should not be called from a worker
359 CHECK(!workers_.empty());
360 CHECK(!shutdown_);
361 CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
362
363 // this function shouldn't be called from worker's thread
364 for (auto& worker : workers_) {
365 DCHECK(
366 // if the eventbase is not running, it returns true for isInEvbThread()
367 !worker->getEventBase()->isRunning() ||
368 !worker->getEventBase()->isInEventBaseThread());
369 }
370 folly::SocketAddress boundAddress;
371 for (auto& worker : workers_) {
372 worker->getEventBase()->runInEventBaseThreadAndWait([&] {
373 std::lock_guard<std::mutex> guard(startMutex_);
374 CHECK(initialized_);
375 auto workerEvb = worker->getEventBase();
376 auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
377 boundAddress = worker->overrideTakeoverHandlerAddress(
378 std::move(localListenSocket), addr);
379 });
380 }
381 return boundAddress;
382 }
383
pauseRead()384 void QuicServer::pauseRead() {
385 runOnAllWorkersSync([&](auto worker) mutable { worker->pauseRead(); });
386 }
387
routeDataToWorker(const folly::SocketAddress & client,RoutingData && routingData,NetworkData && networkData,folly::Optional<QuicVersion> quicVersion,bool isForwardedData)388 void QuicServer::routeDataToWorker(
389 const folly::SocketAddress& client,
390 RoutingData&& routingData,
391 NetworkData&& networkData,
392 folly::Optional<QuicVersion> quicVersion,
393 bool isForwardedData) {
394 // figure out worker idx
395 if (!initialized_) {
396 // drop the packet if we are not initialized. This is a janky memory
397 // barrier.
398 VLOG(4) << "Dropping data since quic-server is not initialized";
399 if (workerPtr_) {
400 QUIC_STATS(
401 workerPtr_->getTransportStatsCallback(),
402 onPacketDropped,
403 QuicTransportStatsCallback::PacketDropReason::WORKER_NOT_INITIALIZED);
404 }
405 return;
406 }
407
408 if (shutdown_) {
409 VLOG(4) << "Dropping data since quic server is shutdown";
410 if (workerPtr_) {
411 QUIC_STATS(
412 workerPtr_->getTransportStatsCallback(),
413 onPacketDropped,
414 QuicTransportStatsCallback::PacketDropReason::SERVER_SHUTDOWN);
415 }
416 return;
417 }
418
419 // For initial or zeroRtt packets, pick the worker that kernel / bpf routed to
420 // Without this, when (bpf / kernel) hash and userspace hash get out of sync
421 // (e.g. due to shuffling of sockets in the hash ring), it results in
422 // very high amount of 'misses'
423 if (routingData.isUsingClientConnId && workerPtr_) {
424 CHECK(workerPtr_->getEventBase()->isInEventBaseThread());
425 workerPtr_->dispatchPacketData(
426 client,
427 std::move(routingData),
428 std::move(networkData),
429 quicVersion,
430 isForwardedData);
431 return;
432 }
433
434 auto workerToRunOn =
435 getWorkerToRouteTo(routingData, workers_.size(), connIdAlgo_.get());
436 auto& worker = workers_[workerToRunOn];
437 VLOG_IF(4, !worker->getEventBase()->isInEventBaseThread())
438 << " Routing to worker in different EVB, to workerId=" << workerToRunOn;
439 folly::EventBase* workerEvb = worker->getEventBase();
440 bool isInEvb = workerEvb->isInEventBaseThread();
441 if (isInEvb) {
442 worker->dispatchPacketData(
443 client,
444 std::move(routingData),
445 std::move(networkData),
446 quicVersion,
447 isForwardedData);
448 return;
449 }
450 worker->getEventBase()->runInEventBaseThread([server =
451 this->shared_from_this(),
452 cl = client,
453 routingData =
454 std::move(routingData),
455 w = worker.get(),
456 buf = std::move(networkData),
457 isForwarded = isForwardedData,
458 quicVersion]() mutable {
459 if (server->shutdown_) {
460 return;
461 }
462 w->dispatchPacketData(
463 cl, std::move(routingData), std::move(buf), quicVersion, isForwarded);
464 });
465 }
466
handleWorkerError(LocalErrorCode error)467 void QuicServer::handleWorkerError(LocalErrorCode error) {
468 shutdown(error);
469 }
470
waitUntilInitialized()471 void QuicServer::waitUntilInitialized() {
472 std::unique_lock<std::mutex> guard(startMutex_);
473 if (shutdown_ || initialized_) {
474 return;
475 }
476 for (auto& worker : workers_) {
477 DCHECK(!worker->getEventBase()->isInEventBaseThread());
478 }
479 startCv_.wait(guard, [&] { return initialized_ || shutdown_; });
480 }
481
~QuicServer()482 QuicServer::~QuicServer() {
483 shutdown(LocalErrorCode::SHUTTING_DOWN);
484 }
485
shutdown(LocalErrorCode error)486 void QuicServer::shutdown(LocalErrorCode error) {
487 if (shutdown_) {
488 return;
489 }
490 shutdown_ = true;
491 auto usingCCP = isUsingCCP();
492 for (auto& worker : workers_) {
493 worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
494 worker->shutdownAllConnections(error);
495 if (usingCCP) {
496 worker->getCcpReader()->shutdown();
497 }
498 workerPtr_.reset();
499 });
500 // protecting the erase in map with the mutex since
501 // the erase could potentally affect concurrent accesses from other threads
502 std::lock_guard<std::mutex> guard(startMutex_);
503 evbToWorkers_.erase(worker->getEventBase());
504 evbToAcceptors_.erase(worker->getEventBase());
505 }
506 startCv_.notify_all();
507 }
508
hasShutdown() const509 bool QuicServer::hasShutdown() const noexcept {
510 return shutdown_;
511 }
512
runOnAllWorkers(const std::function<void (QuicServerWorker *)> & func)513 void QuicServer::runOnAllWorkers(
514 const std::function<void(QuicServerWorker*)>& func) {
515 std::lock_guard<std::mutex> guard(startMutex_);
516 if (shutdown_) {
517 return;
518 }
519 for (auto& worker : workers_) {
520 worker->getEventBase()->runInEventBaseThread(
521 [&worker, self = this->shared_from_this(), func]() mutable {
522 if (self->shutdown_) {
523 return;
524 }
525 func(worker.get());
526 });
527 }
528 }
529
runOnAllWorkersSync(const std::function<void (QuicServerWorker *)> & func)530 void QuicServer::runOnAllWorkersSync(
531 const std::function<void(QuicServerWorker*)>& func) {
532 std::lock_guard<std::mutex> guard(startMutex_);
533 if (shutdown_) {
534 return;
535 }
536 for (auto& worker : workers_) {
537 worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
538 [&worker, self = this->shared_from_this(), func]() mutable {
539 if (self->shutdown_) {
540 return;
541 }
542 func(worker.get());
543 });
544 }
545 }
546
setHostId(uint32_t hostId)547 void QuicServer::setHostId(uint32_t hostId) noexcept {
548 CHECK(!initialized_) << "Host id must be set before initializing Quic server";
549 hostId_ = hostId;
550 }
551
setConnectionIdVersion(ConnectionIdVersion cidVersion)552 void QuicServer::setConnectionIdVersion(
553 ConnectionIdVersion cidVersion) noexcept {
554 CHECK(!initialized_)
555 << "ConnectionIdVersion must be set before initializing Quic server";
556 cidVersion_ = cidVersion;
557 }
558
setTransportSettingsOverrideFn(TransportSettingsOverrideFn fn)559 void QuicServer::setTransportSettingsOverrideFn(
560 TransportSettingsOverrideFn fn) {
561 CHECK(!initialized_) << "Transport settings override function must be"
562 << "set before initializing Quic server";
563 transportSettingsOverrideFn_ = std::move(fn);
564 }
565
setHealthCheckToken(const std::string & healthCheckToken)566 void QuicServer::setHealthCheckToken(const std::string& healthCheckToken) {
567 // Make sure the token satisfies the required properties, i.e. it is not a
568 // valid quic header.
569 auto parsed = parseHeader(*folly::IOBuf::copyBuffer(healthCheckToken));
570 CHECK(!parsed.hasValue());
571 CHECK_GT(healthCheckToken.size(), kMinHealthCheckTokenSize);
572 healthCheckToken_ = healthCheckToken;
573 runOnAllWorkers([healthCheckToken](auto worker) mutable {
574 worker->setHealthCheckToken(healthCheckToken);
575 });
576 }
577
setFizzContext(std::shared_ptr<const fizz::server::FizzServerContext> ctx)578 void QuicServer::setFizzContext(
579 std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
580 ctx_ = ctx;
581 runOnAllWorkers([ctx](auto worker) mutable { worker->setFizzContext(ctx); });
582 }
583
setFizzContext(folly::EventBase * evb,std::shared_ptr<const fizz::server::FizzServerContext> ctx)584 void QuicServer::setFizzContext(
585 folly::EventBase* evb,
586 std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
587 CHECK(evb);
588 CHECK(ctx);
589 evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
590 std::lock_guard<std::mutex> guard(startMutex_);
591 if (shutdown_) {
592 return;
593 }
594 auto it = evbToWorkers_.find(evb);
595 CHECK(it != evbToWorkers_.end());
596 it->second->setFizzContext(ctx);
597 });
598 }
599
getTransportSettings() const600 const TransportSettings& QuicServer::getTransportSettings() const noexcept {
601 return transportSettings_;
602 }
603
setTransportSettings(TransportSettings transportSettings)604 void QuicServer::setTransportSettings(TransportSettings transportSettings) {
605 transportSettings_ = transportSettings;
606 runOnAllWorkers([transportSettings](auto worker) mutable {
607 worker->setTransportSettings(transportSettings);
608 });
609 }
610
setCcpId(uint64_t ccpId)611 void QuicServer::setCcpId(uint64_t ccpId) {
612 ccpId_ = ccpId;
613 }
614
isUsingCCP()615 bool QuicServer::isUsingCCP() {
616 auto foundId = ccpId_ != 0;
617 #ifdef CCP_ENABLED
618 auto default_is_ccp = transportSettings_.defaultCongestionController ==
619 CongestionControlType::CCP;
620 if (foundId && default_is_ccp) {
621 return true;
622 } else {
623 return false;
624 }
625 return foundId;
626 #else
627 if (foundId) {
628 LOG(ERROR)
629 << "found ccp id, but server was not compiled with ccp support. recompile with -DCCP_ENABLED.";
630 }
631 return false;
632 #endif
633 }
634
rejectNewConnections(std::function<bool ()> rejectFn)635 void QuicServer::rejectNewConnections(std::function<bool()> rejectFn) {
636 rejectNewConnections_ = rejectFn;
637 runOnAllWorkers([rejectFn](auto worker) mutable {
638 worker->rejectNewConnections(rejectFn);
639 });
640 }
641
blockListedSrcPort(std::function<bool (uint16_t)> isBlockListedSrcPort)642 void QuicServer::blockListedSrcPort(
643 std::function<bool(uint16_t)> isBlockListedSrcPort) {
644 isBlockListedSrcPort_ = isBlockListedSrcPort;
645 runOnAllWorkers([isBlockListedSrcPort](auto worker) mutable {
646 worker->setIsBlockListedSrcPort(isBlockListedSrcPort);
647 });
648 }
649
startPacketForwarding(const folly::SocketAddress & destAddr)650 void QuicServer::startPacketForwarding(const folly::SocketAddress& destAddr) {
651 if (initialized_) {
652 runOnAllWorkersSync([destAddr](auto worker) mutable {
653 worker->startPacketForwarding(destAddr);
654 });
655 }
656 }
657
stopPacketForwarding(std::chrono::milliseconds delay)658 void QuicServer::stopPacketForwarding(std::chrono::milliseconds delay) {
659 std::lock_guard<std::mutex> guard(startMutex_);
660 if (!initialized_ || shutdown_) {
661 return;
662 }
663 for (auto& worker : workers_) {
664 worker->getEventBase()->runInEventBaseThreadAndWait(
665 [&worker, self = this->shared_from_this(), delay]() mutable {
666 if (self->shutdown_) {
667 return;
668 }
669 worker->getEventBase()->runAfterDelay(
670 [&worker, self]() mutable {
671 if (worker && !self->shutdown_) {
672 worker->stopPacketForwarding();
673 }
674 },
675 delay.count());
676 });
677 }
678 }
679
setTransportStatsCallbackFactory(std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory)680 void QuicServer::setTransportStatsCallbackFactory(
681 std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory) {
682 CHECK(statsFactory);
683 transportStatsFactory_ = std::move(statsFactory);
684 }
685
setConnectionIdAlgoFactory(std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory)686 void QuicServer::setConnectionIdAlgoFactory(
687 std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory) {
688 CHECK(!initialized_);
689 CHECK(connIdAlgoFactory);
690 connIdAlgoFactory_ = std::move(connIdAlgoFactory);
691 }
692
addTransportFactory(folly::EventBase * evb,QuicServerTransportFactory * acceptor)693 void QuicServer::addTransportFactory(
694 folly::EventBase* evb,
695 QuicServerTransportFactory* acceptor) {
696 CHECK(evb);
697 CHECK(acceptor);
698 evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
699 std::lock_guard<std::mutex> guard(startMutex_);
700 if (shutdown_) {
701 return;
702 }
703 evbToAcceptors_.emplace(evb, acceptor);
704 auto it = evbToWorkers_.find(evb);
705 if (it != evbToWorkers_.end()) {
706 it->second->setTransportFactory(acceptor);
707 } else {
708 VLOG(3) << "Couldn't find associated worker for the given eventbase";
709 }
710 });
711 }
712
getAddress() const713 const folly::SocketAddress& QuicServer::getAddress() const {
714 CHECK(initialized_) << "Quic server is not initialized. "
715 << "Consider calling waitUntilInitialized() before this ";
716 return boundAddress_;
717 }
718
setListeningFDs(const std::vector<int> & fds)719 void QuicServer::setListeningFDs(const std::vector<int>& fds) {
720 std::lock_guard<std::mutex> guard(startMutex_);
721 listeningFDs_ = fds;
722 }
723
getListeningSocketFD() const724 int QuicServer::getListeningSocketFD() const {
725 CHECK(initialized_) << "Quic server is not initialized. "
726 << "Consider calling waitUntilInitialized() before this ";
727 return workers_[0]->getFD();
728 }
729
getAllListeningSocketFDs() const730 std::vector<int> QuicServer::getAllListeningSocketFDs() const noexcept {
731 CHECK(initialized_) << "Quic server is not initialized. "
732 << "Consider calling waitUntilInitialized() before this ";
733 std::vector<int> sockets(workers_.size());
734 for (const auto& worker : workers_) {
735 if (worker->getFD() != -1) {
736 CHECK_LT(worker->getWorkerId(), workers_.size());
737 sockets.at(worker->getWorkerId()) = worker->getFD();
738 }
739 }
740 return sockets;
741 }
742
getAllConnectionsStats(std::vector<QuicConnectionStats> & stats)743 void QuicServer::getAllConnectionsStats(
744 std::vector<QuicConnectionStats>& stats) {
745 runOnAllWorkersSync(
746 [&stats](auto worker) mutable { worker->getAllConnectionsStats(stats); });
747 }
748
getTakeoverProtocolVersion() const749 TakeoverProtocolVersion QuicServer::getTakeoverProtocolVersion()
750 const noexcept {
751 return workers_[0]->getTakeoverProtocolVersion();
752 }
753
getTakeoverHandlerSocketFD() const754 int QuicServer::getTakeoverHandlerSocketFD() const {
755 CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
756 return workers_[0]->getTakeoverHandlerSocketFD();
757 }
758
getWorkerEvbs() const759 std::vector<folly::EventBase*> QuicServer::getWorkerEvbs() const noexcept {
760 CHECK(initialized_) << "Quic server is not initialized. ";
761 std::vector<folly::EventBase*> ebvs;
762 for (const auto& worker : workers_) {
763 ebvs.push_back(worker->getEventBase());
764 }
765 return ebvs;
766 }
767
addAcceptObserver(folly::EventBase * evb,AcceptObserver * observer)768 bool QuicServer::addAcceptObserver(
769 folly::EventBase* evb,
770 AcceptObserver* observer) {
771 CHECK(initialized_);
772 CHECK(evb);
773 bool success = false;
774 evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
775 std::lock_guard<std::mutex> guard(startMutex_);
776 if (shutdown_) {
777 return;
778 }
779 auto it = evbToWorkers_.find(evb);
780 if (it != evbToWorkers_.end()) {
781 it->second->addAcceptObserver(observer);
782 success = true;
783 } else {
784 VLOG(3) << "Couldn't find associated worker for the given eventbase, "
785 << "unable to add AcceptObserver";
786 success = false;
787 }
788 });
789 return success;
790 }
791
removeAcceptObserver(folly::EventBase * evb,AcceptObserver * observer)792 bool QuicServer::removeAcceptObserver(
793 folly::EventBase* evb,
794 AcceptObserver* observer) {
795 CHECK(initialized_);
796 CHECK(evb);
797 bool success = false;
798 evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
799 std::lock_guard<std::mutex> guard(startMutex_);
800 if (shutdown_) {
801 return;
802 }
803 auto it = evbToWorkers_.find(evb);
804 if (it != evbToWorkers_.end()) {
805 success = it->second->removeAcceptObserver(observer);
806 } else {
807 VLOG(3) << "Couldn't find associated worker for the given eventbase, "
808 << "unable to remove AcceptObserver";
809 success = false;
810 }
811 });
812 return success;
813 }
814
815 } // namespace quic
816