1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #include <quic/server/QuicServer.h>
10 
11 #include <folly/Random.h>
12 #include <folly/io/async/EventBaseManager.h>
13 #include <quic/codec/DefaultConnectionIdAlgo.h>
14 #include <quic/codec/QuicHeaderCodec.h>
15 #include <iterator>
16 #ifdef CCP_ENABLED
17 #include <quic/congestion_control/third_party/ccp/libstartccp.h>
18 #endif
19 #include <quic/server/CCPReader.h>
20 #include <quic/server/QuicReusePortUDPSocketFactory.h>
21 #include <quic/server/QuicServerTransport.h>
22 #include <quic/server/QuicSharedUDPSocketFactory.h>
23 #include <quic/server/SlidingWindowRateLimiter.h>
24 
25 DEFINE_bool(
26     qs_io_uring_use_async_recv,
27     true,
28     "io_uring backend use async recv");
29 
30 namespace quic {
31 namespace {
32 // Determine which worker to route to
33 // This **MUST** be kept in sync with the BPF program (if supplied)
getWorkerToRouteTo(const RoutingData & routingData,size_t numWorkers,ConnectionIdAlgo * connIdAlgo)34 size_t getWorkerToRouteTo(
35     const RoutingData& routingData,
36     size_t numWorkers,
37     ConnectionIdAlgo* connIdAlgo) {
38   return connIdAlgo->parseConnectionId(routingData.destinationConnId)
39              ->workerId %
40       numWorkers;
41 }
42 } // namespace
43 
QuicServer()44 QuicServer::QuicServer() {
45   listenerSocketFactory_ = std::make_unique<QuicReusePortUDPSocketFactory>();
46   socketFactory_ = std::make_unique<QuicSharedUDPSocketFactory>();
47 }
48 
setQuicServerTransportFactory(std::unique_ptr<QuicServerTransportFactory> factory)49 void QuicServer::setQuicServerTransportFactory(
50     std::unique_ptr<QuicServerTransportFactory> factory) {
51   transportFactory_ = std::move(factory);
52 }
53 
setQuicUDPSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory)54 void QuicServer::setQuicUDPSocketFactory(
55     std::unique_ptr<QuicUDPSocketFactory> factory) {
56   socketFactory_ = std::move(factory);
57 }
58 
setListenerSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory)59 void QuicServer::setListenerSocketFactory(
60     std::unique_ptr<QuicUDPSocketFactory> factory) {
61   listenerSocketFactory_ = std::move(factory);
62 }
63 
setCongestionControllerFactory(std::shared_ptr<CongestionControllerFactory> ccFactory)64 void QuicServer::setCongestionControllerFactory(
65     std::shared_ptr<CongestionControllerFactory> ccFactory) {
66   CHECK(!initialized_)
67       << " Congestion Control Factorty must be set before the server is "
68       << "initialized.";
69   CHECK(ccFactory);
70   ccFactory_ = std::move(ccFactory);
71 }
72 
setRateLimit(std::function<uint64_t ()> count,std::chrono::seconds window)73 void QuicServer::setRateLimit(
74     std::function<uint64_t()> count,
75     std::chrono::seconds window) {
76   rateLimit_ = folly::make_optional<RateLimit>(std::move(count), window);
77 }
78 
setSupportedVersion(const std::vector<QuicVersion> & versions)79 void QuicServer::setSupportedVersion(const std::vector<QuicVersion>& versions) {
80   supportedVersions_ = versions;
81 }
82 
setProcessId(ProcessId id)83 void QuicServer::setProcessId(ProcessId id) noexcept {
84   processId_ = id;
85 }
86 
getProcessId() const87 ProcessId QuicServer::getProcessId() const noexcept {
88   return processId_;
89 }
90 
isInitialized() const91 bool QuicServer::isInitialized() const noexcept {
92   return initialized_;
93 }
94 
start(const folly::SocketAddress & address,size_t maxWorkers)95 void QuicServer::start(const folly::SocketAddress& address, size_t maxWorkers) {
96   CHECK(ctx_) << "Must set a TLS context for the Quic server";
97   CHECK_LE(maxWorkers, std::numeric_limits<uint8_t>::max());
98   size_t numCpu = std::thread::hardware_concurrency();
99   if (maxWorkers == 0) {
100     maxWorkers = numCpu;
101   }
102   auto numWorkers = std::min(numCpu, maxWorkers);
103   std::vector<folly::EventBase*> evbs;
104   for (size_t i = 0; i < numWorkers; ++i) {
105     auto scopedEvb = std::make_unique<folly::ScopedEventBaseThread>(
106         folly::EventBase::Options().setBackendFactory(
107             [] { return getEventBaseBackend(); }),
108         nullptr,
109         "");
110     workerEvbs_.push_back(std::move(scopedEvb));
111     if (evbObserver_) {
112       workerEvbs_.back()->getEventBase()->runInEventBaseThreadAndWait([&] {
113         workerEvbs_.back()->getEventBase()->setObserver(evbObserver_);
114       });
115     }
116     auto workerEvb = workerEvbs_.back()->getEventBase();
117     evbs.push_back(workerEvb);
118   }
119   initialize(address, evbs, true /* useDefaultTransport */);
120   start();
121 }
122 
initialize(const folly::SocketAddress & address,const std::vector<folly::EventBase * > & evbs,bool useDefaultTransport)123 void QuicServer::initialize(
124     const folly::SocketAddress& address,
125     const std::vector<folly::EventBase*>& evbs,
126     bool useDefaultTransport) {
127   CHECK(!evbs.empty());
128   CHECK_LE(evbs.size(), std::numeric_limits<uint8_t>::max())
129       << "Quic Server does not support more than "
130       << std::numeric_limits<uint8_t>::max() << " workers";
131   CHECK(shutdown_);
132   shutdown_ = false;
133 
134   // setting default stateless reset token if not set
135   if (!transportSettings_.statelessResetTokenSecret) {
136     std::array<uint8_t, kStatelessResetTokenSecretLength> secret;
137     folly::Random::secureRandom(secret.data(), secret.size());
138     transportSettings_.statelessResetTokenSecret = secret;
139   }
140 
141   // it the connid algo factory is not set, use default impl
142   if (!connIdAlgoFactory_) {
143     connIdAlgoFactory_ = std::make_unique<DefaultConnectionIdAlgoFactory>();
144     connIdAlgo_ = connIdAlgoFactory_->make();
145   } else {
146     connIdAlgo_ = connIdAlgoFactory_->make();
147   }
148   if (!ccFactory_) {
149     ccFactory_ = std::make_shared<ServerCongestionControllerFactory>();
150   }
151 
152   initializeWorkers(evbs, useDefaultTransport);
153   bindWorkersToSocket(address, evbs);
154 }
155 
initializeWorkers(const std::vector<folly::EventBase * > & evbs,bool useDefaultTransport)156 void QuicServer::initializeWorkers(
157     const std::vector<folly::EventBase*>& evbs,
158     bool useDefaultTransport) {
159   CHECK(workers_.empty());
160   // iterate in the order of insertion in vector
161   for (size_t i = 0; i < evbs.size(); ++i) {
162     auto workerEvb = evbs[i];
163     auto worker = newWorkerWithoutSocket();
164     if (useDefaultTransport) {
165       CHECK(transportFactory_) << "Transport factory is not set";
166       worker->setTransportFactory(transportFactory_.get());
167       worker->setFizzContext(ctx_);
168     }
169     if (healthCheckToken_) {
170       worker->setHealthCheckToken(*healthCheckToken_);
171     }
172     if (transportStatsFactory_) {
173       workerEvb->runInEventBaseThread(
174           [self = this->shared_from_this(),
175            workerPtr = worker.get(),
176            transportStatsFactory = transportStatsFactory_.get()] {
177             if (self->shutdown_) {
178               return;
179             }
180             auto statsCallback = transportStatsFactory->make();
181             CHECK(statsCallback);
182             workerPtr->setTransportStatsCallback(std::move(statsCallback));
183           });
184     }
185     worker->setConnectionIdAlgo(connIdAlgoFactory_->make());
186     worker->setCongestionControllerFactory(ccFactory_);
187     if (rateLimit_) {
188       worker->setRateLimiter(std::make_unique<SlidingWindowRateLimiter>(
189           rateLimit_->count, rateLimit_->window));
190     }
191     worker->setWorkerId(i);
192     worker->setTransportSettingsOverrideFn(transportSettingsOverrideFn_);
193     workers_.push_back(std::move(worker));
194     evbToWorkers_.emplace(workerEvb, workers_.back().get());
195   }
196 }
197 
newWorkerWithoutSocket()198 std::unique_ptr<QuicServerWorker> QuicServer::newWorkerWithoutSocket() {
199   auto worker = std::make_unique<QuicServerWorker>(
200       this->shared_from_this(), FLAGS_qs_io_uring_use_async_recv);
201   worker->setNewConnectionSocketFactory(socketFactory_.get());
202   worker->setSupportedVersions(supportedVersions_);
203   worker->setTransportSettings(transportSettings_);
204   worker->rejectNewConnections(rejectNewConnections_);
205   worker->setProcessId(processId_);
206   worker->setHostId(hostId_);
207   worker->setConnectionIdVersion(cidVersion_);
208   return worker;
209 }
210 
bindWorkersToSocket(const folly::SocketAddress & address,const std::vector<folly::EventBase * > & evbs)211 void QuicServer::bindWorkersToSocket(
212     const folly::SocketAddress& address,
213     const std::vector<folly::EventBase*>& evbs) {
214   auto numWorkers = evbs.size();
215   CHECK(!initialized_);
216   boundAddress_ = address;
217   auto usingCCP = isUsingCCP();
218   if (!usingCCP) {
219     LOG(INFO) << "NOT using CCP";
220   }
221   auto ccpInitFailed = false;
222   for (size_t i = 0; i < numWorkers; ++i) {
223     auto workerEvb = evbs[i];
224     workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
225         [self = this->shared_from_this(),
226          workerEvb,
227          numWorkers,
228          usingCCP,
229          &ccpInitFailed,
230          processId = processId_,
231          idx = i] {
232           std::lock_guard<std::mutex> guard(self->startMutex_);
233           if (self->shutdown_) {
234             return;
235           }
236           auto workerSocket = self->listenerSocketFactory_->make(workerEvb, -1);
237           auto it = self->evbToWorkers_.find(workerEvb);
238           CHECK(it != self->evbToWorkers_.end());
239           auto worker = it->second;
240           int takeoverOverFd = -1;
241           if (self->listeningFDs_.size() > idx) {
242             takeoverOverFd = self->listeningFDs_[idx];
243           }
244           worker->setSocketOptions(&self->socketOptions_);
245           // dup the takenover socket on only one worker and bind the rest
246           if (takeoverOverFd >= 0) {
247             workerSocket->setFD(
248                 folly::NetworkSocket::fromFd(::dup(takeoverOverFd)),
249                 // set ownership to OWNS to allow ::close()'ing of of the fd
250                 // when this server goes away
251                 folly::AsyncUDPSocket::FDOwnership::OWNS);
252             worker->setSocket(std::move(workerSocket));
253             if (idx == 0) {
254               self->boundAddress_ = worker->getAddress();
255             }
256             VLOG(4) << "Set up dup()'ed fd for address=" << self->boundAddress_
257                     << " on workerId=" << (int)worker->getWorkerId();
258             worker->applyAllSocketOptions();
259           } else {
260             VLOG(4) << "No valid takenover fd found for address="
261                     << self->boundAddress_ << ". binding on worker=" << worker
262                     << " workerId=" << (int)worker->getWorkerId()
263                     << " processId=" << (int)processId;
264             worker->setSocket(std::move(workerSocket));
265             worker->bind(self->boundAddress_, self->bindOptions_);
266             if (idx == 0) {
267               self->boundAddress_ = worker->getAddress();
268             }
269           }
270           if (usingCCP) {
271             auto serverId = self->boundAddress_.getIPAddress().hash() |
272                 self->boundAddress_.getPort();
273             try {
274               worker->getCcpReader()->try_initialize(
275                   worker->getEventBase(),
276                   self->ccpId_,
277                   serverId,
278                   worker->getWorkerId());
279               worker->getCcpReader()->connect();
280             } catch (const folly::AsyncSocketException& ex) {
281               // probably means the unix socket failed to bind
282               LOG(ERROR) << "exception while initializing ccp: " << ex.what()
283                          << "\nshutting down...";
284               // TODO also update counters
285               ccpInitFailed = true;
286             } catch (const std::exception& ex) {
287               LOG(ERROR) << "exception initializing ccp: " << ex.what()
288                          << "\nshutting down...";
289               ccpInitFailed = true;
290             }
291           }
292           if (idx == (numWorkers - 1)) {
293             VLOG(4) << "Initialized all workers in the eventbase";
294             self->initialized_ = true;
295             self->startCv_.notify_all();
296           }
297         });
298   }
299   if (usingCCP && ccpInitFailed) {
300     shutdown();
301   }
302 }
303 
start()304 void QuicServer::start() {
305   CHECK(initialized_);
306   // initialize the thread local ptr to workers
307   runOnAllWorkers([&](auto worker) mutable {
308     // pass in no-op deleter to ThreadLocalPtr since the destruction of
309     // QuicServerWorker is managed by the QuicServer
310     workerPtr_.reset(
311         worker, [](auto /* worker */, folly::TLPDestructionMode) {});
312   });
313   auto usingCCP = isUsingCCP();
314   for (auto& worker : workers_) {
315     worker->getEventBase()->runInEventBaseThread([&worker, usingCCP] {
316       if (usingCCP) {
317         worker->getCcpReader()->start();
318       }
319       worker->start();
320     });
321   }
322 }
323 
allowBeingTakenOver(const folly::SocketAddress & addr)324 void QuicServer::allowBeingTakenOver(const folly::SocketAddress& addr) {
325   // synchronously bind workers to takeover handler port.
326   // This method should not be called from a worker
327   CHECK(!workers_.empty());
328   CHECK(!shutdown_);
329 
330   // this function shouldn't be called from worker's thread
331   for (auto& worker : workers_) {
332     DCHECK(
333         // if the eventbase is not running, it returns true for isInEvbThread()
334         !worker->getEventBase()->isRunning() ||
335         !worker->getEventBase()->isInEventBaseThread());
336   }
337   // TODO workers_ (vector) is not protected against concurrent modifications
338   auto numWorkers = workers_.size();
339   for (size_t i = 0; i < numWorkers; ++i) {
340     auto workerEvb = workers_[i]->getEventBase();
341     workerEvb->runInEventBaseThreadAndWait([&] {
342       std::lock_guard<std::mutex> guard(startMutex_);
343       CHECK(initialized_);
344       auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
345       auto it = evbToWorkers_.find(workerEvb);
346       CHECK(it != evbToWorkers_.end());
347       auto worker = it->second;
348       worker->allowBeingTakenOver(std::move(localListenSocket), addr);
349     });
350   }
351   VLOG(4) << "Bind all workers in the eventbase to takeover handler port";
352   takeoverHandlerInitialized_ = true;
353 }
354 
overrideTakeoverHandlerAddress(const folly::SocketAddress & addr)355 folly::SocketAddress QuicServer::overrideTakeoverHandlerAddress(
356     const folly::SocketAddress& addr) {
357   // synchronously bind workers to takeover handler port.
358   // This method should not be called from a worker
359   CHECK(!workers_.empty());
360   CHECK(!shutdown_);
361   CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
362 
363   // this function shouldn't be called from worker's thread
364   for (auto& worker : workers_) {
365     DCHECK(
366         // if the eventbase is not running, it returns true for isInEvbThread()
367         !worker->getEventBase()->isRunning() ||
368         !worker->getEventBase()->isInEventBaseThread());
369   }
370   folly::SocketAddress boundAddress;
371   for (auto& worker : workers_) {
372     worker->getEventBase()->runInEventBaseThreadAndWait([&] {
373       std::lock_guard<std::mutex> guard(startMutex_);
374       CHECK(initialized_);
375       auto workerEvb = worker->getEventBase();
376       auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
377       boundAddress = worker->overrideTakeoverHandlerAddress(
378           std::move(localListenSocket), addr);
379     });
380   }
381   return boundAddress;
382 }
383 
pauseRead()384 void QuicServer::pauseRead() {
385   runOnAllWorkersSync([&](auto worker) mutable { worker->pauseRead(); });
386 }
387 
routeDataToWorker(const folly::SocketAddress & client,RoutingData && routingData,NetworkData && networkData,folly::Optional<QuicVersion> quicVersion,bool isForwardedData)388 void QuicServer::routeDataToWorker(
389     const folly::SocketAddress& client,
390     RoutingData&& routingData,
391     NetworkData&& networkData,
392     folly::Optional<QuicVersion> quicVersion,
393     bool isForwardedData) {
394   // figure out worker idx
395   if (!initialized_) {
396     // drop the packet if we are not initialized. This is a janky memory
397     // barrier.
398     VLOG(4) << "Dropping data since quic-server is not initialized";
399     if (workerPtr_) {
400       QUIC_STATS(
401           workerPtr_->getTransportStatsCallback(),
402           onPacketDropped,
403           QuicTransportStatsCallback::PacketDropReason::WORKER_NOT_INITIALIZED);
404     }
405     return;
406   }
407 
408   if (shutdown_) {
409     VLOG(4) << "Dropping data since quic server is shutdown";
410     if (workerPtr_) {
411       QUIC_STATS(
412           workerPtr_->getTransportStatsCallback(),
413           onPacketDropped,
414           QuicTransportStatsCallback::PacketDropReason::SERVER_SHUTDOWN);
415     }
416     return;
417   }
418 
419   // For initial or zeroRtt packets, pick the worker that kernel / bpf routed to
420   // Without this, when (bpf / kernel) hash and userspace hash get out of sync
421   // (e.g. due to shuffling of sockets in the hash ring), it results in
422   // very high amount of 'misses'
423   if (routingData.isUsingClientConnId && workerPtr_) {
424     CHECK(workerPtr_->getEventBase()->isInEventBaseThread());
425     workerPtr_->dispatchPacketData(
426         client,
427         std::move(routingData),
428         std::move(networkData),
429         quicVersion,
430         isForwardedData);
431     return;
432   }
433 
434   auto workerToRunOn =
435       getWorkerToRouteTo(routingData, workers_.size(), connIdAlgo_.get());
436   auto& worker = workers_[workerToRunOn];
437   VLOG_IF(4, !worker->getEventBase()->isInEventBaseThread())
438       << " Routing to worker in different EVB, to workerId=" << workerToRunOn;
439   folly::EventBase* workerEvb = worker->getEventBase();
440   bool isInEvb = workerEvb->isInEventBaseThread();
441   if (isInEvb) {
442     worker->dispatchPacketData(
443         client,
444         std::move(routingData),
445         std::move(networkData),
446         quicVersion,
447         isForwardedData);
448     return;
449   }
450   worker->getEventBase()->runInEventBaseThread([server =
451                                                     this->shared_from_this(),
452                                                 cl = client,
453                                                 routingData =
454                                                     std::move(routingData),
455                                                 w = worker.get(),
456                                                 buf = std::move(networkData),
457                                                 isForwarded = isForwardedData,
458                                                 quicVersion]() mutable {
459     if (server->shutdown_) {
460       return;
461     }
462     w->dispatchPacketData(
463         cl, std::move(routingData), std::move(buf), quicVersion, isForwarded);
464   });
465 }
466 
handleWorkerError(LocalErrorCode error)467 void QuicServer::handleWorkerError(LocalErrorCode error) {
468   shutdown(error);
469 }
470 
waitUntilInitialized()471 void QuicServer::waitUntilInitialized() {
472   std::unique_lock<std::mutex> guard(startMutex_);
473   if (shutdown_ || initialized_) {
474     return;
475   }
476   for (auto& worker : workers_) {
477     DCHECK(!worker->getEventBase()->isInEventBaseThread());
478   }
479   startCv_.wait(guard, [&] { return initialized_ || shutdown_; });
480 }
481 
~QuicServer()482 QuicServer::~QuicServer() {
483   shutdown(LocalErrorCode::SHUTTING_DOWN);
484 }
485 
shutdown(LocalErrorCode error)486 void QuicServer::shutdown(LocalErrorCode error) {
487   if (shutdown_) {
488     return;
489   }
490   shutdown_ = true;
491   auto usingCCP = isUsingCCP();
492   for (auto& worker : workers_) {
493     worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
494       worker->shutdownAllConnections(error);
495       if (usingCCP) {
496         worker->getCcpReader()->shutdown();
497       }
498       workerPtr_.reset();
499     });
500     // protecting the erase in map with the mutex since
501     // the erase could potentally affect concurrent accesses from other threads
502     std::lock_guard<std::mutex> guard(startMutex_);
503     evbToWorkers_.erase(worker->getEventBase());
504     evbToAcceptors_.erase(worker->getEventBase());
505   }
506   startCv_.notify_all();
507 }
508 
hasShutdown() const509 bool QuicServer::hasShutdown() const noexcept {
510   return shutdown_;
511 }
512 
runOnAllWorkers(const std::function<void (QuicServerWorker *)> & func)513 void QuicServer::runOnAllWorkers(
514     const std::function<void(QuicServerWorker*)>& func) {
515   std::lock_guard<std::mutex> guard(startMutex_);
516   if (shutdown_) {
517     return;
518   }
519   for (auto& worker : workers_) {
520     worker->getEventBase()->runInEventBaseThread(
521         [&worker, self = this->shared_from_this(), func]() mutable {
522           if (self->shutdown_) {
523             return;
524           }
525           func(worker.get());
526         });
527   }
528 }
529 
runOnAllWorkersSync(const std::function<void (QuicServerWorker *)> & func)530 void QuicServer::runOnAllWorkersSync(
531     const std::function<void(QuicServerWorker*)>& func) {
532   std::lock_guard<std::mutex> guard(startMutex_);
533   if (shutdown_) {
534     return;
535   }
536   for (auto& worker : workers_) {
537     worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
538         [&worker, self = this->shared_from_this(), func]() mutable {
539           if (self->shutdown_) {
540             return;
541           }
542           func(worker.get());
543         });
544   }
545 }
546 
setHostId(uint32_t hostId)547 void QuicServer::setHostId(uint32_t hostId) noexcept {
548   CHECK(!initialized_) << "Host id must be set before initializing Quic server";
549   hostId_ = hostId;
550 }
551 
setConnectionIdVersion(ConnectionIdVersion cidVersion)552 void QuicServer::setConnectionIdVersion(
553     ConnectionIdVersion cidVersion) noexcept {
554   CHECK(!initialized_)
555       << "ConnectionIdVersion must be set before initializing Quic server";
556   cidVersion_ = cidVersion;
557 }
558 
setTransportSettingsOverrideFn(TransportSettingsOverrideFn fn)559 void QuicServer::setTransportSettingsOverrideFn(
560     TransportSettingsOverrideFn fn) {
561   CHECK(!initialized_) << "Transport settings override function must be"
562                        << "set before initializing Quic server";
563   transportSettingsOverrideFn_ = std::move(fn);
564 }
565 
setHealthCheckToken(const std::string & healthCheckToken)566 void QuicServer::setHealthCheckToken(const std::string& healthCheckToken) {
567   // Make sure the token satisfies the required properties, i.e. it is not a
568   // valid quic header.
569   auto parsed = parseHeader(*folly::IOBuf::copyBuffer(healthCheckToken));
570   CHECK(!parsed.hasValue());
571   CHECK_GT(healthCheckToken.size(), kMinHealthCheckTokenSize);
572   healthCheckToken_ = healthCheckToken;
573   runOnAllWorkers([healthCheckToken](auto worker) mutable {
574     worker->setHealthCheckToken(healthCheckToken);
575   });
576 }
577 
setFizzContext(std::shared_ptr<const fizz::server::FizzServerContext> ctx)578 void QuicServer::setFizzContext(
579     std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
580   ctx_ = ctx;
581   runOnAllWorkers([ctx](auto worker) mutable { worker->setFizzContext(ctx); });
582 }
583 
setFizzContext(folly::EventBase * evb,std::shared_ptr<const fizz::server::FizzServerContext> ctx)584 void QuicServer::setFizzContext(
585     folly::EventBase* evb,
586     std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
587   CHECK(evb);
588   CHECK(ctx);
589   evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
590     std::lock_guard<std::mutex> guard(startMutex_);
591     if (shutdown_) {
592       return;
593     }
594     auto it = evbToWorkers_.find(evb);
595     CHECK(it != evbToWorkers_.end());
596     it->second->setFizzContext(ctx);
597   });
598 }
599 
getTransportSettings() const600 const TransportSettings& QuicServer::getTransportSettings() const noexcept {
601   return transportSettings_;
602 }
603 
setTransportSettings(TransportSettings transportSettings)604 void QuicServer::setTransportSettings(TransportSettings transportSettings) {
605   transportSettings_ = transportSettings;
606   runOnAllWorkers([transportSettings](auto worker) mutable {
607     worker->setTransportSettings(transportSettings);
608   });
609 }
610 
setCcpId(uint64_t ccpId)611 void QuicServer::setCcpId(uint64_t ccpId) {
612   ccpId_ = ccpId;
613 }
614 
isUsingCCP()615 bool QuicServer::isUsingCCP() {
616   auto foundId = ccpId_ != 0;
617 #ifdef CCP_ENABLED
618   auto default_is_ccp = transportSettings_.defaultCongestionController ==
619       CongestionControlType::CCP;
620   if (foundId && default_is_ccp) {
621     return true;
622   } else {
623     return false;
624   }
625   return foundId;
626 #else
627   if (foundId) {
628     LOG(ERROR)
629         << "found ccp id, but server was not compiled with ccp support. recompile with -DCCP_ENABLED.";
630   }
631   return false;
632 #endif
633 }
634 
rejectNewConnections(std::function<bool ()> rejectFn)635 void QuicServer::rejectNewConnections(std::function<bool()> rejectFn) {
636   rejectNewConnections_ = rejectFn;
637   runOnAllWorkers([rejectFn](auto worker) mutable {
638     worker->rejectNewConnections(rejectFn);
639   });
640 }
641 
blockListedSrcPort(std::function<bool (uint16_t)> isBlockListedSrcPort)642 void QuicServer::blockListedSrcPort(
643     std::function<bool(uint16_t)> isBlockListedSrcPort) {
644   isBlockListedSrcPort_ = isBlockListedSrcPort;
645   runOnAllWorkers([isBlockListedSrcPort](auto worker) mutable {
646     worker->setIsBlockListedSrcPort(isBlockListedSrcPort);
647   });
648 }
649 
startPacketForwarding(const folly::SocketAddress & destAddr)650 void QuicServer::startPacketForwarding(const folly::SocketAddress& destAddr) {
651   if (initialized_) {
652     runOnAllWorkersSync([destAddr](auto worker) mutable {
653       worker->startPacketForwarding(destAddr);
654     });
655   }
656 }
657 
stopPacketForwarding(std::chrono::milliseconds delay)658 void QuicServer::stopPacketForwarding(std::chrono::milliseconds delay) {
659   std::lock_guard<std::mutex> guard(startMutex_);
660   if (!initialized_ || shutdown_) {
661     return;
662   }
663   for (auto& worker : workers_) {
664     worker->getEventBase()->runInEventBaseThreadAndWait(
665         [&worker, self = this->shared_from_this(), delay]() mutable {
666           if (self->shutdown_) {
667             return;
668           }
669           worker->getEventBase()->runAfterDelay(
670               [&worker, self]() mutable {
671                 if (worker && !self->shutdown_) {
672                   worker->stopPacketForwarding();
673                 }
674               },
675               delay.count());
676         });
677   }
678 }
679 
setTransportStatsCallbackFactory(std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory)680 void QuicServer::setTransportStatsCallbackFactory(
681     std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory) {
682   CHECK(statsFactory);
683   transportStatsFactory_ = std::move(statsFactory);
684 }
685 
setConnectionIdAlgoFactory(std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory)686 void QuicServer::setConnectionIdAlgoFactory(
687     std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory) {
688   CHECK(!initialized_);
689   CHECK(connIdAlgoFactory);
690   connIdAlgoFactory_ = std::move(connIdAlgoFactory);
691 }
692 
addTransportFactory(folly::EventBase * evb,QuicServerTransportFactory * acceptor)693 void QuicServer::addTransportFactory(
694     folly::EventBase* evb,
695     QuicServerTransportFactory* acceptor) {
696   CHECK(evb);
697   CHECK(acceptor);
698   evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
699     std::lock_guard<std::mutex> guard(startMutex_);
700     if (shutdown_) {
701       return;
702     }
703     evbToAcceptors_.emplace(evb, acceptor);
704     auto it = evbToWorkers_.find(evb);
705     if (it != evbToWorkers_.end()) {
706       it->second->setTransportFactory(acceptor);
707     } else {
708       VLOG(3) << "Couldn't find associated worker for the given eventbase";
709     }
710   });
711 }
712 
getAddress() const713 const folly::SocketAddress& QuicServer::getAddress() const {
714   CHECK(initialized_) << "Quic server is not initialized. "
715                       << "Consider calling waitUntilInitialized() before this ";
716   return boundAddress_;
717 }
718 
setListeningFDs(const std::vector<int> & fds)719 void QuicServer::setListeningFDs(const std::vector<int>& fds) {
720   std::lock_guard<std::mutex> guard(startMutex_);
721   listeningFDs_ = fds;
722 }
723 
getListeningSocketFD() const724 int QuicServer::getListeningSocketFD() const {
725   CHECK(initialized_) << "Quic server is not initialized. "
726                       << "Consider calling waitUntilInitialized() before this ";
727   return workers_[0]->getFD();
728 }
729 
getAllListeningSocketFDs() const730 std::vector<int> QuicServer::getAllListeningSocketFDs() const noexcept {
731   CHECK(initialized_) << "Quic server is not initialized. "
732                       << "Consider calling waitUntilInitialized() before this ";
733   std::vector<int> sockets(workers_.size());
734   for (const auto& worker : workers_) {
735     if (worker->getFD() != -1) {
736       CHECK_LT(worker->getWorkerId(), workers_.size());
737       sockets.at(worker->getWorkerId()) = worker->getFD();
738     }
739   }
740   return sockets;
741 }
742 
getAllConnectionsStats(std::vector<QuicConnectionStats> & stats)743 void QuicServer::getAllConnectionsStats(
744     std::vector<QuicConnectionStats>& stats) {
745   runOnAllWorkersSync(
746       [&stats](auto worker) mutable { worker->getAllConnectionsStats(stats); });
747 }
748 
getTakeoverProtocolVersion() const749 TakeoverProtocolVersion QuicServer::getTakeoverProtocolVersion()
750     const noexcept {
751   return workers_[0]->getTakeoverProtocolVersion();
752 }
753 
getTakeoverHandlerSocketFD() const754 int QuicServer::getTakeoverHandlerSocketFD() const {
755   CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
756   return workers_[0]->getTakeoverHandlerSocketFD();
757 }
758 
getWorkerEvbs() const759 std::vector<folly::EventBase*> QuicServer::getWorkerEvbs() const noexcept {
760   CHECK(initialized_) << "Quic server is not initialized. ";
761   std::vector<folly::EventBase*> ebvs;
762   for (const auto& worker : workers_) {
763     ebvs.push_back(worker->getEventBase());
764   }
765   return ebvs;
766 }
767 
addAcceptObserver(folly::EventBase * evb,AcceptObserver * observer)768 bool QuicServer::addAcceptObserver(
769     folly::EventBase* evb,
770     AcceptObserver* observer) {
771   CHECK(initialized_);
772   CHECK(evb);
773   bool success = false;
774   evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
775     std::lock_guard<std::mutex> guard(startMutex_);
776     if (shutdown_) {
777       return;
778     }
779     auto it = evbToWorkers_.find(evb);
780     if (it != evbToWorkers_.end()) {
781       it->second->addAcceptObserver(observer);
782       success = true;
783     } else {
784       VLOG(3) << "Couldn't find associated worker for the given eventbase, "
785               << "unable to add AcceptObserver";
786       success = false;
787     }
788   });
789   return success;
790 }
791 
removeAcceptObserver(folly::EventBase * evb,AcceptObserver * observer)792 bool QuicServer::removeAcceptObserver(
793     folly::EventBase* evb,
794     AcceptObserver* observer) {
795   CHECK(initialized_);
796   CHECK(evb);
797   bool success = false;
798   evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
799     std::lock_guard<std::mutex> guard(startMutex_);
800     if (shutdown_) {
801       return;
802     }
803     auto it = evbToWorkers_.find(evb);
804     if (it != evbToWorkers_.end()) {
805       success = it->second->removeAcceptObserver(observer);
806     } else {
807       VLOG(3) << "Couldn't find associated worker for the given eventbase, "
808               << "unable to remove AcceptObserver";
809       success = false;
810     }
811   });
812   return success;
813 }
814 
815 } // namespace quic
816