1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <signal.h>
18 
19 #include <folly/FileUtil.h>
20 #include <folly/Likely.h>
21 #include <folly/SpinLock.h>
22 #include <folly/String.h>
23 #include <folly/container/F14Map.h>
24 #include <folly/container/F14Set.h>
25 #include <folly/experimental/io/IoUringBackend.h>
26 #include <folly/portability/GFlags.h>
27 #include <folly/portability/Sockets.h>
28 #include <folly/synchronization/CallOnce.h>
29 
30 #if __has_include(<sys/timerfd.h>)
31 #include <sys/timerfd.h>
32 #endif
33 
34 #if __has_include(<liburing.h>)
35 
36 extern "C" FOLLY_ATTR_WEAK void eb_poll_loop_pre_hook(uint64_t* call_time);
37 extern "C" FOLLY_ATTR_WEAK void eb_poll_loop_post_hook(
38     uint64_t call_time, int ret);
39 
40 namespace {
41 struct SignalRegistry {
42   struct SigInfo {
43     struct sigaction sa_ {};
44     size_t refs_{0};
45   };
46   using SignalMap = std::map<int, SigInfo>;
47 
SignalRegistry__anone85a6d8a0111::SignalRegistry48   constexpr SignalRegistry() {}
49 
50   void notify(int sig);
51   void setNotifyFd(int sig, int fd);
52 
53   // lock protecting the signal map
54   folly::MicroSpinLock mapLock_ = {0};
55   std::unique_ptr<SignalMap> map_;
56   std::atomic<int> notifyFd_{-1};
57 };
58 
getSignalRegistry()59 SignalRegistry& getSignalRegistry() {
60   static auto& sInstance = *new SignalRegistry();
61   return sInstance;
62 }
63 
evSigHandler(int sig)64 void evSigHandler(int sig) {
65   getSignalRegistry().notify(sig);
66 }
67 
notify(int sig)68 void SignalRegistry::notify(int sig) {
69   // use try_lock in case somebody already has the lock
70   std::unique_lock<folly::MicroSpinLock> lk(mapLock_, std::try_to_lock);
71   if (lk.owns_lock()) {
72     int fd = notifyFd_.load();
73     if (fd >= 0) {
74       uint8_t sigNum = static_cast<uint8_t>(sig);
75       ::write(fd, &sigNum, 1);
76     }
77   }
78 }
79 
setNotifyFd(int sig,int fd)80 void SignalRegistry::setNotifyFd(int sig, int fd) {
81   std::lock_guard<folly::MicroSpinLock> g(mapLock_);
82   if (fd >= 0) {
83     if (!map_) {
84       map_ = std::make_unique<SignalMap>();
85     }
86     // switch the fd
87     notifyFd_.store(fd);
88 
89     auto iter = (*map_).find(sig);
90     if (iter != (*map_).end()) {
91       iter->second.refs_++;
92     } else {
93       auto& entry = (*map_)[sig];
94       entry.refs_ = 1;
95       struct sigaction sa = {};
96       sa.sa_handler = evSigHandler;
97       sa.sa_flags |= SA_RESTART;
98       ::sigfillset(&sa.sa_mask);
99 
100       if (::sigaction(sig, &sa, &entry.sa_) == -1) {
101         (*map_).erase(sig);
102       }
103     }
104   } else {
105     notifyFd_.store(fd);
106 
107     if (map_) {
108       auto iter = (*map_).find(sig);
109       if ((iter != (*map_).end()) && (--iter->second.refs_ == 0)) {
110         auto entry = iter->second;
111         (*map_).erase(iter);
112         // just restore
113         ::sigaction(sig, &entry.sa_, nullptr);
114       }
115     }
116   }
117 }
118 
119 } // namespace
120 
121 namespace {
122 class SQGroupInfoRegistry {
123  private:
124   // a group is a collection of io_uring instances
125   // that share up to numThreads SQ poll threads
126   struct SQGroupInfo {
127     struct SQSubGroupInfo {
128       folly::F14FastSet<int> fds;
129       size_t count{0};
130 
add__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo::SQSubGroupInfo131       void add(int fd) {
132         CHECK(fds.find(fd) == fds.end());
133         fds.insert(fd);
134         ++count;
135       }
136 
remove__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo::SQSubGroupInfo137       size_t remove(int fd) {
138         auto iter = fds.find(fd);
139         CHECK(iter != fds.end());
140         fds.erase(fd);
141         --count;
142 
143         return count;
144       }
145     };
146 
SQGroupInfo__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo147     SQGroupInfo(size_t num, std::set<uint32_t> const& cpus) : subGroups(num) {
148       for (const uint32_t cpu : cpus) {
149         nextCpu.emplace_back(cpu);
150       }
151     }
152 
153     // returns the least loaded subgroup
getNextSubgroup__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo154     SQSubGroupInfo* getNextSubgroup() {
155       size_t min_idx = 0;
156       for (size_t i = 0; i < subGroups.size(); i++) {
157         if (subGroups[i].count == 0) {
158           return &subGroups[i];
159         }
160 
161         if (subGroups[i].count < subGroups[min_idx].count) {
162           min_idx = i;
163         }
164       }
165 
166       return &subGroups[min_idx];
167     }
168 
add__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo169     size_t add(int fd, SQSubGroupInfo* sg) {
170       CHECK(fdSgMap.find(fd) == fdSgMap.end());
171       fdSgMap.insert(std::make_pair(fd, sg));
172       sg->add(fd);
173       ++count;
174 
175       return count;
176     }
177 
remove__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo178     size_t remove(int fd) {
179       auto iter = fdSgMap.find(fd);
180       CHECK(fdSgMap.find(fd) != fdSgMap.end());
181       iter->second->remove(fd);
182       fdSgMap.erase(iter);
183       --count;
184 
185       return count;
186     }
187 
188     // file descriptor to sub group index map
189     folly::F14FastMap<int, SQSubGroupInfo*> fdSgMap;
190     // array of subgoups
191     std::vector<SQSubGroupInfo> subGroups;
192     // number of entries
193     size_t count{0};
194     // Set of CPUs we will bind threads to.
195     std::vector<uint32_t> nextCpu;
196     int nextCpuIndex{0};
197   };
198 
199   using SQGroupInfoMap = folly::F14FastMap<std::string, SQGroupInfo>;
200   SQGroupInfoMap map_;
201   std::mutex mutex_;
202 
203  public:
204   SQGroupInfoRegistry() = default;
205   ~SQGroupInfoRegistry() = default;
206 
207   using FDCreateFunc = folly::Function<int(struct io_uring_params&)>;
208   using FDCloseFunc = folly::Function<void()>;
209 
addTo(const std::string & groupName,size_t groupNumThreads,FDCreateFunc & createFd,struct io_uring_params & params,std::set<uint32_t> const & cpus)210   size_t addTo(
211       const std::string& groupName,
212       size_t groupNumThreads,
213       FDCreateFunc& createFd,
214       struct io_uring_params& params,
215       std::set<uint32_t> const& cpus) {
216     if (groupName.empty()) {
217       createFd(params);
218       return 0;
219     }
220 
221     std::lock_guard g(mutex_);
222 
223     SQGroupInfo::SQSubGroupInfo* sg = nullptr;
224     SQGroupInfo* info = nullptr;
225     auto iter = map_.find(groupName);
226     if (iter != map_.end()) {
227       info = &iter->second;
228     } else {
229       // First use of this group.
230       SQGroupInfo gr(groupNumThreads, cpus);
231       info =
232           &map_.insert(std::make_pair(groupName, std::move(gr))).first->second;
233     }
234     sg = info->getNextSubgroup();
235     if (sg->count) {
236       // we're adding to a non empty subgroup
237       params.wq_fd = *(sg->fds.begin());
238       params.flags |= IORING_SETUP_ATTACH_WQ;
239     } else {
240       // First use of this subgroup, pin thread to CPU if specified.
241       if (info->nextCpu.size()) {
242         uint32_t cpu = info->nextCpu[info->nextCpuIndex];
243         info->nextCpuIndex = (info->nextCpuIndex + 1) % info->nextCpu.size();
244 
245         params.sq_thread_cpu = cpu;
246         params.flags |= IORING_SETUP_SQ_AFF;
247       }
248     }
249 
250     auto fd = createFd(params);
251     if (fd < 0) {
252       return 0;
253     }
254 
255     return info->add(fd, sg);
256   }
257 
removeFrom(const std::string & groupName,int fd,FDCloseFunc & func)258   size_t removeFrom(const std::string& groupName, int fd, FDCloseFunc& func) {
259     if (groupName.empty()) {
260       func();
261       return 0;
262     }
263 
264     size_t ret;
265 
266     std::lock_guard g(mutex_);
267 
268     func();
269 
270     auto iter = map_.find(groupName);
271     CHECK(iter != map_.end());
272     // check for empty group
273     if ((ret = iter->second.remove(fd)) == 0) {
274       map_.erase(iter);
275     }
276 
277     return ret;
278   }
279 };
280 
281 static folly::Indestructible<SQGroupInfoRegistry> sSQGroupInfoRegistry;
282 
283 } // namespace
284 
285 namespace folly {
TimerEntry(Event * event,const struct timeval & timeout)286 IoUringBackend::TimerEntry::TimerEntry(
287     Event* event, const struct timeval& timeout)
288     : event_(event) {
289   setExpireTime(timeout, std::chrono::steady_clock::now());
290 }
291 
SocketPair()292 IoUringBackend::SocketPair::SocketPair() {
293   if (::socketpair(AF_UNIX, SOCK_STREAM, 0, fds_.data())) {
294     throw std::runtime_error("socketpair error");
295   }
296 
297   // set the sockets to non blocking mode
298   for (auto fd : fds_) {
299     auto flags = ::fcntl(fd, F_GETFL, 0);
300     ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
301   }
302 }
303 
~SocketPair()304 IoUringBackend::SocketPair::~SocketPair() {
305   for (auto fd : fds_) {
306     if (fd >= 0) {
307       ::close(fd);
308     }
309   }
310 }
311 
FdRegistry(struct io_uring & ioRing,size_t n)312 IoUringBackend::FdRegistry::FdRegistry(struct io_uring& ioRing, size_t n)
313     : ioRing_(ioRing), files_(n, -1), inUse_(n), records_(n) {}
314 
init()315 int IoUringBackend::FdRegistry::init() {
316   if (inUse_) {
317     int ret = ::io_uring_register_files(&ioRing_, files_.data(), inUse_);
318 
319     if (!ret) {
320       // build and set the free list head if we succeed
321       for (size_t i = 0; i < records_.size(); i++) {
322         records_[i].idx_ = i;
323         free_.push_front(records_[i]);
324       }
325     } else {
326       LOG(ERROR) << "io_uring_register_files(" << inUse_ << ") "
327                  << "failed errno = " << errno << ":\""
328                  << folly::errnoStr(errno) << "\" " << this;
329     }
330 
331     return ret;
332   }
333 
334   return 0;
335 }
336 
alloc(int fd)337 IoUringBackend::FdRegistrationRecord* IoUringBackend::FdRegistry::alloc(
338     int fd) {
339   if (FOLLY_UNLIKELY(err_ || free_.empty())) {
340     return nullptr;
341   }
342 
343   auto& record = free_.front();
344 
345   // now we have an idx
346   int ret = ::io_uring_register_files_update(&ioRing_, record.idx_, &fd, 1);
347   if (ret != 1) {
348     // set the err_ flag so we do not retry again
349     // this usually happens when we hit the file desc limit
350     // and retrying this operation for every request is expensive
351     err_ = true;
352     LOG(ERROR) << "io_uring_register_files(1) "
353                << "failed errno = " << errno << ":\"" << folly::errnoStr(errno)
354                << "\" " << this;
355     return nullptr;
356   }
357 
358   record.fd_ = fd;
359   record.count_ = 1;
360   free_.pop_front();
361 
362   return &record;
363 }
364 
free(IoUringBackend::FdRegistrationRecord * record)365 bool IoUringBackend::FdRegistry::free(
366     IoUringBackend::FdRegistrationRecord* record) {
367   if (record && (--record->count_ == 0)) {
368     record->fd_ = -1;
369     int ret = ::io_uring_register_files_update(
370         &ioRing_, record->idx_, &record->fd_, 1);
371 
372     // we add it to the free list anyway here
373     free_.push_front(*record);
374 
375     return (ret == 1);
376   }
377 
378   return false;
379 }
380 
IoUringBackend(Options options)381 IoUringBackend::IoUringBackend(Options options)
382     : options_(options),
383       numEntries_(options.capacity),
384       fdRegistry_(ioRing_, options.useRegisteredFds ? options.capacity : 0) {
385   // create the timer fd
386   timerFd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
387   if (timerFd_ < 0) {
388     throw std::runtime_error("timerfd_create error");
389   }
390 
391   ::memset(&ioRing_, 0, sizeof(ioRing_));
392   ::memset(&params_, 0, sizeof(params_));
393 
394   params_.flags |= IORING_SETUP_CQSIZE;
395   params_.cq_entries = options.capacity;
396 
397   // poll SQ options
398   if (options.flags & Options::Flags::POLL_SQ) {
399     params_.flags |= IORING_SETUP_SQPOLL;
400     params_.sq_thread_idle = options.sqIdle.count();
401   }
402 
403   SQGroupInfoRegistry::FDCreateFunc func = [&](struct io_uring_params& params) {
404     while (true) {
405       // allocate entries both for poll add and cancel
406       if (::io_uring_queue_init_params(
407               2 * options_.maxSubmit, &ioRing_, &params)) {
408         options.capacity /= 2;
409         if (options.minCapacity && (options.capacity >= options.minCapacity)) {
410           LOG(INFO) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit
411                     << "," << params.cq_entries << ") "
412                     << "failed errno = " << errno << ":\""
413                     << folly::errnoStr(errno) << "\" " << this
414                     << " retrying with capacity = " << options.capacity;
415 
416           params_.cq_entries = options.capacity;
417           numEntries_ = options.capacity;
418         } else {
419           LOG(ERROR) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit
420                      << "," << params.cq_entries << ") "
421                      << "failed errno = " << errno << ":\""
422                      << folly::errnoStr(errno) << "\" " << this;
423 
424           throw NotAvailable("io_uring_queue_init error");
425         }
426       } else {
427         // success - break
428         break;
429       }
430     }
431 
432     return ioRing_.ring_fd;
433   };
434 
435   auto ret = sSQGroupInfoRegistry->addTo(
436       options_.sqGroupName,
437       options_.sqGroupNumThreads,
438       func,
439       params_,
440       options.sqCpus);
441 
442   if (!options_.sqGroupName.empty()) {
443     LOG(INFO) << "Adding to SQ poll group \"" << options_.sqGroupName
444               << "\" ret = " << ret << " fd = " << ioRing_.ring_fd;
445   }
446 
447   numEntries_ *= 2;
448 
449   // timer entry
450   timerEntry_ = std::make_unique<IoSqe>(this, false, true /*persist*/);
451   timerEntry_->backendCb_ = IoUringBackend::processTimerIoSqe;
452 
453   // signal entry
454   signalReadEntry_ = std::make_unique<IoSqe>(this, false, true /*persist*/);
455   signalReadEntry_->backendCb_ = IoUringBackend::processSignalReadIoSqe;
456 
457   // we need to call the init before adding the timer fd
458   // so we avoid a deadlock - waiting for the queue to be drained
459   if (options.useRegisteredFds) {
460     // now init the file registry
461     // if this fails, we still continue since we
462     // can run without registered fds
463     fdRegistry_.init();
464   }
465 
466   // delay adding the timer and signal fds until running the loop first time
467 }
468 
~IoUringBackend()469 IoUringBackend::~IoUringBackend() {
470   shuttingDown_ = true;
471 
472   cleanup();
473 
474   CHECK(!timerEntry_);
475   CHECK(!signalReadEntry_);
476   CHECK(freeList_.empty());
477 
478   ::close(timerFd_);
479 }
480 
cleanup()481 void IoUringBackend::cleanup() {
482   if (ioRing_.ring_fd > 0) {
483     // release the nonsubmitted items from the submitList
484     while (!submitList_.empty()) {
485       auto* ioSqe = &submitList_.front();
486       submitList_.pop_front();
487       releaseIoSqe(ioSqe);
488     }
489 
490     // release the active events
491     while (!activeEvents_.empty()) {
492       auto* ioSqe = &activeEvents_.front();
493       activeEvents_.pop_front();
494       releaseIoSqe(ioSqe);
495     }
496 
497     // wait for the outstanding events to finish
498     while (numIoSqeInUse()) {
499       struct io_uring_cqe* cqe = nullptr;
500       ::io_uring_wait_cqe(&ioRing_, &cqe);
501       if (cqe) {
502         IoSqe* sqe = reinterpret_cast<IoSqe*>(io_uring_cqe_get_data(cqe));
503         releaseIoSqe(sqe);
504         ::io_uring_cqe_seen(&ioRing_, cqe);
505       }
506     }
507 
508     // free the entries
509     timerEntry_.reset();
510     signalReadEntry_.reset();
511     freeList_.clear_and_dispose([](auto _) { delete _; });
512 
513     int fd = ioRing_.ring_fd;
514     SQGroupInfoRegistry::FDCloseFunc func = [&]() {
515       // exit now
516       ::io_uring_queue_exit(&ioRing_);
517       ioRing_.ring_fd = -1;
518     };
519 
520     auto ret = sSQGroupInfoRegistry->removeFrom(
521         options_.sqGroupName, ioRing_.ring_fd, func);
522 
523     if (!options_.sqGroupName.empty()) {
524       LOG(INFO) << "Removing from SQ poll group \"" << options_.sqGroupName
525                 << "\" ret = " << ret << " fd = " << fd;
526     }
527   }
528 }
529 
isAvailable()530 bool IoUringBackend::isAvailable() {
531   static bool sAvailable = true;
532 
533   static folly::once_flag initFlag;
534   folly::call_once(initFlag, [&]() {
535     try {
536       Options options;
537       options.setCapacity(1024);
538       IoUringBackend backend(options);
539     } catch (const NotAvailable&) {
540       sAvailable = false;
541     }
542   });
543 
544   return sAvailable;
545 }
546 
addTimerFd()547 bool IoUringBackend::addTimerFd() {
548   auto* entry = allocSubmissionEntry(); // this can be nullptr
549   timerEntry_->prepPollAdd(entry, timerFd_, POLLIN, true /*registerFd*/);
550   return (1 == submitOne());
551 }
552 
addSignalFds()553 bool IoUringBackend::addSignalFds() {
554   auto* entry = allocSubmissionEntry(); // this can be nullptr
555   signalReadEntry_->prepPollAdd(
556       entry, signalFds_.readFd(), POLLIN, false /*registerFd*/);
557 
558   return (1 == submitOne());
559 }
560 
scheduleTimeout()561 void IoUringBackend::scheduleTimeout() {
562   if (!timerChanged_) {
563     return;
564   }
565 
566   // reset
567   timerChanged_ = false;
568   if (!timers_.empty()) {
569     auto delta = timers_.begin()->second[0].getRemainingTime(
570         std::chrono::steady_clock::now());
571     if (delta.count() < 1000) {
572       delta = std::chrono::microseconds(1000);
573     }
574     scheduleTimeout(delta);
575   } else {
576     scheduleTimeout(std::chrono::microseconds(0)); // disable
577   }
578 
579   // we do not call addTimerFd() here
580   // since it has to be added only once, after
581   // we process a poll callback
582 }
583 
scheduleTimeout(const std::chrono::microseconds & us)584 void IoUringBackend::scheduleTimeout(const std::chrono::microseconds& us) {
585   struct itimerspec val;
586   val.it_interval = {0, 0};
587   val.it_value.tv_sec =
588       std::chrono::duration_cast<std::chrono::seconds>(us).count();
589   val.it_value.tv_nsec =
590       std::chrono::duration_cast<std::chrono::nanoseconds>(us).count() %
591       1000000000LL;
592 
593   CHECK_EQ(::timerfd_settime(timerFd_, 0, &val, nullptr), 0);
594 }
595 
addTimerEvent(Event & event,const struct timeval * timeout)596 void IoUringBackend::addTimerEvent(
597     Event& event, const struct timeval* timeout) {
598   // first try to remove if already existing
599   auto iter1 = eventToTimers_.find(&event);
600   if (iter1 != eventToTimers_.end()) {
601     // no neeed to remove it from eventToTimers_
602     auto expireTime = iter1->second;
603     auto iter2 = timers_.find(expireTime);
604     for (auto iter = iter2->second.begin(), last = iter2->second.end();
605          iter != last;
606          ++iter) {
607       if (iter->event_ == &event) {
608         iter2->second.erase(iter);
609         break;
610       }
611     }
612 
613     if (iter2->second.empty()) {
614       timers_.erase(iter2);
615     }
616   }
617 
618   TimerEntry entry(&event, *timeout);
619   if (!timerChanged_) {
620     timerChanged_ =
621         timers_.empty() || (entry.expireTime_ < timers_.begin()->first);
622   }
623   timers_[entry.expireTime_].push_back(entry);
624   eventToTimers_[&event] = entry.expireTime_;
625 }
626 
removeTimerEvent(Event & event)627 void IoUringBackend::removeTimerEvent(Event& event) {
628   auto iter1 = eventToTimers_.find(&event);
629   CHECK(iter1 != eventToTimers_.end());
630   auto expireTime = iter1->second;
631   eventToTimers_.erase(iter1);
632 
633   auto iter2 = timers_.find(expireTime);
634   CHECK(iter2 != timers_.end());
635 
636   for (auto iter = iter2->second.begin(), last = iter2->second.end();
637        iter != last;
638        ++iter) {
639     if (iter->event_ == &event) {
640       iter2->second.erase(iter);
641       break;
642     }
643   }
644 
645   if (iter2->second.empty()) {
646     if (!timerChanged_) {
647       timerChanged_ = (iter2 == timers_.begin());
648     }
649     timers_.erase(iter2);
650   }
651 }
652 
processTimers()653 size_t IoUringBackend::processTimers() {
654   size_t ret = 0;
655   uint64_t data = 0;
656   // this can fail with but it is OK since the fd
657   // will still be readable
658   folly::readNoInt(timerFd_, &data, sizeof(data));
659 
660   auto now = std::chrono::steady_clock::now();
661   while (!timers_.empty() && (now >= timers_.begin()->first)) {
662     if (!timerChanged_) {
663       timerChanged_ = true;
664     }
665     auto vec = std::move(timers_.begin()->second);
666     timers_.erase(timers_.begin());
667     for (auto& entry : vec) {
668       ret++;
669       eventToTimers_.erase(entry.event_);
670       auto* ev = entry.event_->getEvent();
671       ev->ev_res = EV_TIMEOUT;
672       event_ref_flags(ev).get() = EVLIST_INIT;
673       (*event_ref_callback(ev))((int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
674     }
675   }
676 
677   return ret;
678 }
679 
addSignalEvent(Event & event)680 void IoUringBackend::addSignalEvent(Event& event) {
681   auto* ev = event.getEvent();
682   signals_[ev->ev_fd].insert(&event);
683 
684   // we pass the write fd for notifications
685   getSignalRegistry().setNotifyFd(ev->ev_fd, signalFds_.writeFd());
686 }
687 
removeSignalEvent(Event & event)688 void IoUringBackend::removeSignalEvent(Event& event) {
689   auto* ev = event.getEvent();
690   auto iter = signals_.find(ev->ev_fd);
691   if (iter != signals_.end()) {
692     getSignalRegistry().setNotifyFd(ev->ev_fd, -1);
693   }
694 }
695 
processSignals()696 size_t IoUringBackend::processSignals() {
697   size_t ret = 0;
698   static constexpr auto kNumEntries = NSIG * 2;
699   static_assert(
700       NSIG < 256, "Use a different data type to cover all the signal values");
701   std::array<bool, NSIG> processed{};
702   std::array<uint8_t, kNumEntries> signals;
703 
704   ssize_t num =
705       folly::readNoInt(signalFds_.readFd(), signals.data(), signals.size());
706   for (ssize_t i = 0; i < num; i++) {
707     int signum = static_cast<int>(signals[i]);
708     if ((signum >= 0) && (signum < static_cast<int>(processed.size())) &&
709         !processed[signum]) {
710       processed[signum] = true;
711       auto iter = signals_.find(signum);
712       if (iter != signals_.end()) {
713         auto& set = iter->second;
714         for (auto& event : set) {
715           auto* ev = event->getEvent();
716           ev->ev_res = 0;
717           event_ref_flags(ev) |= EVLIST_ACTIVE;
718           (*event_ref_callback(ev))(
719               (int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
720           event_ref_flags(ev) &= ~EVLIST_ACTIVE;
721         }
722       }
723     }
724   }
725   // add the signal fd(s) back
726   addSignalFds();
727   return ret;
728 }
729 
allocIoSqe(const EventCallback & cb)730 IoUringBackend::IoSqe* IoUringBackend::allocIoSqe(const EventCallback& cb) {
731   // try to allocate from the pool first
732   if ((cb.type_ == EventCallback::Type::TYPE_NONE) && (!freeList_.empty())) {
733     auto* ret = &freeList_.front();
734     freeList_.pop_front();
735     numIoSqeInUse_++;
736     return ret;
737   }
738 
739   // alloc a new IoSqe
740   auto* ret = allocNewIoSqe(cb);
741   if (FOLLY_LIKELY(!!ret)) {
742     numIoSqeInUse_++;
743   }
744 
745   return ret;
746 }
747 
releaseIoSqe(IoUringBackend::IoSqe * aioIoSqe)748 void IoUringBackend::releaseIoSqe(IoUringBackend::IoSqe* aioIoSqe) {
749   CHECK_GT(numIoSqeInUse_, 0);
750   aioIoSqe->cbData_.releaseData();
751   // unregister the file descriptor record
752   if (aioIoSqe->fdRecord_) {
753     unregisterFd(aioIoSqe->fdRecord_);
754     aioIoSqe->fdRecord_ = nullptr;
755   }
756 
757   if (FOLLY_LIKELY(aioIoSqe->poolAlloc_)) {
758     numIoSqeInUse_--;
759     aioIoSqe->event_ = nullptr;
760     freeList_.push_front(*aioIoSqe);
761   } else {
762     if (!aioIoSqe->persist_) {
763       numIoSqeInUse_--;
764       delete aioIoSqe;
765     }
766   }
767 }
768 
processPollIo(IoSqe * ioSqe,int64_t res)769 void IoUringBackend::processPollIo(IoSqe* ioSqe, int64_t res) noexcept {
770   auto* ev = ioSqe->event_ ? (ioSqe->event_->getEvent()) : nullptr;
771   if (ev) {
772     if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
773       // if this is not a persistent event
774       // remove the EVLIST_INSERTED flags
775       // and dec the numInsertedEvents_
776       if (~ev->ev_events & EV_PERSIST) {
777         DCHECK(numInsertedEvents_ > 0);
778         numInsertedEvents_--;
779         event_ref_flags(ev) &= ~EVLIST_INSERTED;
780       }
781     }
782 
783     // add it to the active list
784     event_ref_flags(ev) |= EVLIST_ACTIVE;
785     ev->ev_res = res;
786     activeEvents_.push_back(*ioSqe);
787   } else {
788     releaseIoSqe(ioSqe);
789   }
790 }
791 
processActiveEvents()792 size_t IoUringBackend::processActiveEvents() {
793   size_t ret = 0;
794   IoSqe* ioSqe;
795 
796   while (!activeEvents_.empty() && !loopBreak_) {
797     bool release = true;
798     ioSqe = &activeEvents_.front();
799     activeEvents_.pop_front();
800     ret++;
801     auto* event = ioSqe->event_;
802     auto* ev = event ? event->getEvent() : nullptr;
803     if (ev) {
804       // remove it from the active list
805       event_ref_flags(ev) &= ~EVLIST_ACTIVE;
806       bool inserted = (event_ref_flags(ev) & EVLIST_INSERTED);
807 
808       // prevent the callback from freeing the aioIoSqe
809       ioSqe->useCount_++;
810       if (!ioSqe->cbData_.processCb(ev->ev_res)) {
811         // adjust the ev_res for the poll case
812         ev->ev_res = getPollEvents(ev->ev_res, ev->ev_events);
813         // handle spurious poll events that return 0
814         // this can happen during high load on process startup
815         if (ev->ev_res) {
816           (*event_ref_callback(ev))(
817               (int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
818         }
819       }
820       // get the event again
821       event = ioSqe->event_;
822       ev = event ? event->getEvent() : nullptr;
823       if (ev && inserted && event_ref_flags(ev) & EVLIST_INSERTED &&
824           !shuttingDown_) {
825         release = false;
826         eb_event_modify_inserted(*event, ioSqe);
827       }
828       ioSqe->useCount_--;
829     } else {
830       ioSqe->processActive();
831     }
832     if (release) {
833       releaseIoSqe(ioSqe);
834     }
835   }
836 
837   return ret;
838 }
839 
eb_event_base_loop(int flags)840 int IoUringBackend::eb_event_base_loop(int flags) {
841   if (registerDefaultFds_) {
842     registerDefaultFds_ = false;
843     if (!addTimerFd() || !addSignalFds()) {
844       cleanup();
845       throw NotAvailable("io_uring_submit error");
846     }
847   }
848 
849   // schedule the timers
850   bool done = false;
851   auto waitForEvents = (flags & EVLOOP_NONBLOCK) ? WaitForEventsMode::DONT_WAIT
852                                                  : WaitForEventsMode::WAIT;
853   while (!done) {
854     scheduleTimeout();
855     // check if we need to break here
856     if (loopBreak_) {
857       loopBreak_ = false;
858       break;
859     }
860 
861     submitList(submitList_, waitForEvents);
862 
863     if (!numInsertedEvents_ && timers_.empty() && signals_.empty()) {
864       return 1;
865     }
866 
867     uint64_t call_time = 0;
868     if (eb_poll_loop_pre_hook) {
869       eb_poll_loop_pre_hook(&call_time);
870     }
871 
872     // do not wait for events if EVLOOP_NONBLOCK is set
873     int ret = getActiveEvents(waitForEvents);
874 
875     if (eb_poll_loop_post_hook) {
876       eb_poll_loop_post_hook(call_time, ret);
877     }
878 
879     size_t numProcessedTimers = 0;
880 
881     // save the processTimers_
882     // this means we've received a notification
883     // and we need to add the timer fd back
884     bool processTimersFlag = processTimers_;
885     if (processTimers_ && !loopBreak_) {
886       numProcessedTimers = processTimers();
887       processTimers_ = false;
888     }
889 
890     size_t numProcessedSignals = 0;
891 
892     if (processSignals_ && !loopBreak_) {
893       numProcessedSignals = processSignals();
894       processSignals_ = false;
895     }
896 
897     if (!activeEvents_.empty() && !loopBreak_) {
898       processActiveEvents();
899       if (flags & EVLOOP_ONCE) {
900         done = true;
901       }
902     } else if (flags & EVLOOP_NONBLOCK) {
903       if (signals_.empty()) {
904         done = true;
905       }
906     }
907 
908     if (!done && (numProcessedTimers || numProcessedSignals) &&
909         (flags & EVLOOP_ONCE)) {
910       done = true;
911     }
912 
913     if (processTimersFlag) {
914       addTimerFd();
915     }
916   }
917 
918   return 0;
919 }
920 
eb_event_base_loopbreak()921 int IoUringBackend::eb_event_base_loopbreak() {
922   loopBreak_ = true;
923 
924   return 0;
925 }
926 
eb_event_add(Event & event,const struct timeval * timeout)927 int IoUringBackend::eb_event_add(Event& event, const struct timeval* timeout) {
928   auto* ev = event.getEvent();
929   CHECK(ev);
930   CHECK(!(event_ref_flags(ev) & ~EVLIST_ALL));
931   // we do not support read/write timeouts
932   if (timeout) {
933     event_ref_flags(ev) |= EVLIST_TIMEOUT;
934     addTimerEvent(event, timeout);
935     return 0;
936   }
937 
938   if (ev->ev_events & EV_SIGNAL) {
939     event_ref_flags(ev) |= EVLIST_INSERTED;
940     addSignalEvent(event);
941     return 0;
942   }
943 
944   if ((ev->ev_events & (EV_READ | EV_WRITE)) &&
945       !(event_ref_flags(ev) & (EVLIST_INSERTED | EVLIST_ACTIVE))) {
946     auto* ioSqe = allocIoSqe(event.getCallback());
947     CHECK(ioSqe);
948     ioSqe->event_ = &event;
949 
950     // just append it
951     submitList_.push_back(*ioSqe);
952     if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
953       numInsertedEvents_++;
954     }
955     event_ref_flags(ev) |= EVLIST_INSERTED;
956     event.setUserData(ioSqe);
957   }
958 
959   return 0;
960 }
961 
eb_event_del(Event & event)962 int IoUringBackend::eb_event_del(Event& event) {
963   if (!event.eb_ev_base()) {
964     return -1;
965   }
966 
967   auto* ev = event.getEvent();
968   if (event_ref_flags(ev) & EVLIST_TIMEOUT) {
969     event_ref_flags(ev) &= ~EVLIST_TIMEOUT;
970     removeTimerEvent(event);
971     return 1;
972   }
973 
974   if (!(event_ref_flags(ev) & (EVLIST_ACTIVE | EVLIST_INSERTED))) {
975     return -1;
976   }
977 
978   if (ev->ev_events & EV_SIGNAL) {
979     event_ref_flags(ev) &= ~(EVLIST_INSERTED | EVLIST_ACTIVE);
980     removeSignalEvent(event);
981     return 0;
982   }
983 
984   auto* ioSqe = reinterpret_cast<IoSqe*>(event.getUserData());
985   bool wasLinked = ioSqe->is_linked();
986   ioSqe->resetEvent();
987 
988   // if the event is on the active list, we just clear the flags
989   // and reset the event_ ptr
990   if (event_ref_flags(ev) & EVLIST_ACTIVE) {
991     event_ref_flags(ev) &= ~EVLIST_ACTIVE;
992   }
993 
994   if (event_ref_flags(ev) & EVLIST_INSERTED) {
995     event_ref_flags(ev) &= ~EVLIST_INSERTED;
996 
997     // not in use  - we can cancel it
998     if (!ioSqe->useCount_ && !wasLinked) {
999       // io_cancel will attempt to cancel the event. the result is
1000       // EINVAL - usually the event has already been delivered
1001       // EINPROGRESS - cancellation in progress
1002       // EFAULT - bad ctx
1003       // regardless, we want to dec the numInsertedEvents_
1004       // since even if the events get delivered, the event ptr is nullptr
1005       int ret = cancelOne(ioSqe);
1006       if (ret < 0) {
1007         // release the ioSqe
1008         releaseIoSqe(ioSqe);
1009       }
1010     } else {
1011       if (!ioSqe->useCount_) {
1012         releaseIoSqe(ioSqe);
1013       }
1014     }
1015 
1016     if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
1017       CHECK_GT(numInsertedEvents_, 0);
1018       numInsertedEvents_--;
1019     }
1020 
1021     return 0;
1022   } else {
1023     // we can have an EVLIST_ACTIVE event
1024     // which does not have the EVLIST_INSERTED flag set
1025     // so we need to release it here
1026     releaseIoSqe(ioSqe);
1027   }
1028 
1029   return -1;
1030 }
1031 
eb_event_modify_inserted(Event & event,IoSqe * ioSqe)1032 int IoUringBackend::eb_event_modify_inserted(Event& event, IoSqe* ioSqe) {
1033   // unlink and append
1034   ioSqe->unlink();
1035   submitList_.push_back(*ioSqe);
1036   event.setUserData(ioSqe);
1037 
1038   return 0;
1039 }
1040 
submitOne()1041 int IoUringBackend::submitOne() {
1042   return submitBusyCheck(1, WaitForEventsMode::DONT_WAIT);
1043 }
1044 
cancelOne(IoSqe * ioSqe)1045 int IoUringBackend::cancelOne(IoSqe* ioSqe) {
1046   auto* rentry = static_cast<IoSqe*>(allocIoSqe(EventCallback()));
1047   if (!rentry) {
1048     return 0;
1049   }
1050 
1051   auto* sqe = get_sqe();
1052   CHECK(sqe);
1053 
1054   rentry->prepCancel(sqe, ioSqe); // prev entry
1055 
1056   int ret = submitBusyCheck(1, WaitForEventsMode::DONT_WAIT);
1057 
1058   if (ret < 0) {
1059     // release the sqe
1060     releaseIoSqe(rentry);
1061   }
1062 
1063   return ret;
1064 }
1065 
getActiveEvents(WaitForEventsMode waitForEvents)1066 int IoUringBackend::getActiveEvents(WaitForEventsMode waitForEvents) {
1067   size_t i = 0;
1068   struct io_uring_cqe* cqe = nullptr;
1069   // we can be called from the submitList() method
1070   // or with non blocking flags
1071   if (FOLLY_LIKELY(waitForEvents == WaitForEventsMode::WAIT)) {
1072     // if polling the CQ, busy wait for one entry
1073     if (options_.flags & Options::Flags::POLL_CQ) {
1074       do {
1075         ::io_uring_peek_cqe(&ioRing_, &cqe);
1076         asm_volatile_pause();
1077         // call the loop callback if installed
1078         // we call it every time we poll for a CQE
1079         // regardless of the io_uring_peek_cqe result
1080         if (cqPollLoopCallback_) {
1081           cqPollLoopCallback_();
1082         }
1083       } while (!cqe);
1084     } else {
1085       ::io_uring_wait_cqe(&ioRing_, &cqe);
1086     }
1087   } else {
1088     ::io_uring_peek_cqe(&ioRing_, &cqe);
1089   }
1090   while (cqe && (i < options_.maxGet)) {
1091     i++;
1092     IoSqe* sqe = reinterpret_cast<IoSqe*>(io_uring_cqe_get_data(cqe));
1093     sqe->backendCb_(this, sqe, cqe->res);
1094     ::io_uring_cqe_seen(&ioRing_, cqe);
1095     cqe = nullptr;
1096     ::io_uring_peek_cqe(&ioRing_, &cqe);
1097   }
1098 
1099   return static_cast<int>(i);
1100 }
1101 
submitBusyCheck(int num,WaitForEventsMode waitForEvents)1102 int IoUringBackend::submitBusyCheck(int num, WaitForEventsMode waitForEvents) {
1103   int i = 0;
1104   int res;
1105   while (i < num) {
1106     if (waitForEvents == WaitForEventsMode::WAIT) {
1107       if (options_.flags & Options::Flags::POLL_CQ) {
1108         res = ::io_uring_submit(&ioRing_);
1109       } else {
1110         res = ::io_uring_submit_and_wait(&ioRing_, 1);
1111       }
1112     } else {
1113       res = ::io_uring_submit(&ioRing_);
1114     }
1115     if (res == -EBUSY) {
1116       // if we get EBUSY, try to consume some CQ entries
1117       getActiveEvents(WaitForEventsMode::DONT_WAIT);
1118       continue;
1119     }
1120     if (res < 0) {
1121       // continue if interrupted
1122       if (errno == EINTR) {
1123         continue;
1124       }
1125 
1126       return res;
1127     }
1128 
1129     // we do not have any other entries to submit
1130     if (res == 0) {
1131       break;
1132     }
1133 
1134     i += res;
1135 
1136     // if polling the CQ, busy wait for one entry
1137     if (waitForEvents == WaitForEventsMode::WAIT &&
1138         options_.flags & Options::Flags::POLL_CQ && i == num) {
1139       struct io_uring_cqe* cqe = nullptr;
1140       while (!cqe) {
1141         ::io_uring_peek_cqe(&ioRing_, &cqe);
1142       }
1143     }
1144   }
1145 
1146   return num;
1147 }
1148 
submitList(IoSqeList & ioSqes,WaitForEventsMode waitForEvents)1149 size_t IoUringBackend::submitList(
1150     IoSqeList& ioSqes, WaitForEventsMode waitForEvents) {
1151   int i = 0;
1152   size_t ret = 0;
1153 
1154   while (!ioSqes.empty()) {
1155     auto* entry = &ioSqes.front();
1156     ioSqes.pop_front();
1157     auto* sqe = get_sqe();
1158     CHECK(sqe); // this should not happen
1159 
1160     entry->processSubmit(sqe);
1161     i++;
1162 
1163     if (ioSqes.empty()) {
1164       int num = submitBusyCheck(i, waitForEvents);
1165       CHECK_EQ(num, i);
1166       ret += i;
1167     } else {
1168       if (static_cast<size_t>(i) == options_.maxSubmit) {
1169         int num = submitBusyCheck(i, WaitForEventsMode::DONT_WAIT);
1170         CHECK_EQ(num, i);
1171         ret += i;
1172         i = 0;
1173       }
1174     }
1175   }
1176 
1177   return ret;
1178 }
1179 
queueRead(int fd,void * buf,unsigned int nbytes,off_t offset,FileOpCallback && cb)1180 void IoUringBackend::queueRead(
1181     int fd, void* buf, unsigned int nbytes, off_t offset, FileOpCallback&& cb) {
1182   struct iovec iov {
1183     buf, nbytes
1184   };
1185   auto* ioSqe = new ReadIoSqe(this, fd, &iov, offset, std::move(cb));
1186   ioSqe->backendCb_ = processFileOpCB;
1187   incNumIoSqeInUse();
1188 
1189   submitImmediateIoSqe(*ioSqe);
1190 }
1191 
queueWrite(int fd,const void * buf,unsigned int nbytes,off_t offset,FileOpCallback && cb)1192 void IoUringBackend::queueWrite(
1193     int fd,
1194     const void* buf,
1195     unsigned int nbytes,
1196     off_t offset,
1197     FileOpCallback&& cb) {
1198   struct iovec iov {
1199     const_cast<void*>(buf), nbytes
1200   };
1201   auto* ioSqe = new WriteIoSqe(this, fd, &iov, offset, std::move(cb));
1202   ioSqe->backendCb_ = processFileOpCB;
1203   incNumIoSqeInUse();
1204 
1205   submitImmediateIoSqe(*ioSqe);
1206 }
1207 
queueReadv(int fd,Range<const struct iovec * > iovecs,off_t offset,FileOpCallback && cb)1208 void IoUringBackend::queueReadv(
1209     int fd,
1210     Range<const struct iovec*> iovecs,
1211     off_t offset,
1212     FileOpCallback&& cb) {
1213   auto* ioSqe = new ReadvIoSqe(this, fd, iovecs, offset, std::move(cb));
1214   ioSqe->backendCb_ = processFileOpCB;
1215   incNumIoSqeInUse();
1216 
1217   submitImmediateIoSqe(*ioSqe);
1218 }
1219 
queueWritev(int fd,Range<const struct iovec * > iovecs,off_t offset,FileOpCallback && cb)1220 void IoUringBackend::queueWritev(
1221     int fd,
1222     Range<const struct iovec*> iovecs,
1223     off_t offset,
1224     FileOpCallback&& cb) {
1225   auto* ioSqe = new WritevIoSqe(this, fd, iovecs, offset, std::move(cb));
1226   ioSqe->backendCb_ = processFileOpCB;
1227   incNumIoSqeInUse();
1228 
1229   submitImmediateIoSqe(*ioSqe);
1230 }
1231 
queueFsync(int fd,FileOpCallback && cb)1232 void IoUringBackend::queueFsync(int fd, FileOpCallback&& cb) {
1233   queueFsync(fd, FSyncFlags::FLAGS_FSYNC, std::move(cb));
1234 }
1235 
queueFdatasync(int fd,FileOpCallback && cb)1236 void IoUringBackend::queueFdatasync(int fd, FileOpCallback&& cb) {
1237   queueFsync(fd, FSyncFlags::FLAGS_FDATASYNC, std::move(cb));
1238 }
1239 
queueFsync(int fd,FSyncFlags flags,FileOpCallback && cb)1240 void IoUringBackend::queueFsync(int fd, FSyncFlags flags, FileOpCallback&& cb) {
1241   auto* ioSqe = new FSyncIoSqe(this, fd, flags, std::move(cb));
1242   ioSqe->backendCb_ = processFileOpCB;
1243   incNumIoSqeInUse();
1244 
1245   submitImmediateIoSqe(*ioSqe);
1246 }
1247 
queueOpenat(int dfd,const char * path,int flags,mode_t mode,FileOpCallback && cb)1248 void IoUringBackend::queueOpenat(
1249     int dfd, const char* path, int flags, mode_t mode, FileOpCallback&& cb) {
1250   auto* ioSqe = new FOpenAtIoSqe(this, dfd, path, flags, mode, std::move(cb));
1251   ioSqe->backendCb_ = processFileOpCB;
1252   incNumIoSqeInUse();
1253 
1254   submitImmediateIoSqe(*ioSqe);
1255 }
1256 
queueOpenat2(int dfd,const char * path,struct open_how * how,FileOpCallback && cb)1257 void IoUringBackend::queueOpenat2(
1258     int dfd, const char* path, struct open_how* how, FileOpCallback&& cb) {
1259   auto* ioSqe = new FOpenAt2IoSqe(this, dfd, path, how, std::move(cb));
1260   ioSqe->backendCb_ = processFileOpCB;
1261   incNumIoSqeInUse();
1262 
1263   submitImmediateIoSqe(*ioSqe);
1264 }
1265 
queueClose(int fd,FileOpCallback && cb)1266 void IoUringBackend::queueClose(int fd, FileOpCallback&& cb) {
1267   auto* ioSqe = new FCloseIoSqe(this, fd, std::move(cb));
1268   ioSqe->backendCb_ = processFileOpCB;
1269   incNumIoSqeInUse();
1270 
1271   submitImmediateIoSqe(*ioSqe);
1272 }
1273 
queueFallocate(int fd,int mode,off_t offset,off_t len,FileOpCallback && cb)1274 void IoUringBackend::queueFallocate(
1275     int fd, int mode, off_t offset, off_t len, FileOpCallback&& cb) {
1276   auto* ioSqe = new FAllocateIoSqe(this, fd, mode, offset, len, std::move(cb));
1277   ioSqe->backendCb_ = processFileOpCB;
1278   incNumIoSqeInUse();
1279 
1280   submitImmediateIoSqe(*ioSqe);
1281 }
1282 
queueSendmsg(int fd,const struct msghdr * msg,unsigned int flags,FileOpCallback && cb)1283 void IoUringBackend::queueSendmsg(
1284     int fd, const struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
1285   auto* ioSqe = new SendmsgIoSqe(this, fd, msg, flags, std::move(cb));
1286   ioSqe->backendCb_ = processFileOpCB;
1287   incNumIoSqeInUse();
1288 
1289   submitImmediateIoSqe(*ioSqe);
1290 }
1291 
queueRecvmsg(int fd,struct msghdr * msg,unsigned int flags,FileOpCallback && cb)1292 void IoUringBackend::queueRecvmsg(
1293     int fd, struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
1294   auto* ioSqe = new RecvmsgIoSqe(this, fd, msg, flags, std::move(cb));
1295   ioSqe->backendCb_ = processFileOpCB;
1296   incNumIoSqeInUse();
1297 
1298   submitImmediateIoSqe(*ioSqe);
1299 }
1300 
processFileOp(IoSqe * sqe,int64_t res)1301 void IoUringBackend::processFileOp(IoSqe* sqe, int64_t res) noexcept {
1302   auto* ioSqe = reinterpret_cast<FileOpIoSqe*>(sqe);
1303   // save the res
1304   ioSqe->res_ = res;
1305   activeEvents_.push_back(*ioSqe);
1306   numInsertedEvents_--;
1307 }
1308 
1309 } // namespace folly
1310 
1311 #endif // __has_include(<liburing.h>)
1312