1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <signal.h>
18
19 #include <folly/FileUtil.h>
20 #include <folly/Likely.h>
21 #include <folly/SpinLock.h>
22 #include <folly/String.h>
23 #include <folly/container/F14Map.h>
24 #include <folly/container/F14Set.h>
25 #include <folly/experimental/io/IoUringBackend.h>
26 #include <folly/portability/GFlags.h>
27 #include <folly/portability/Sockets.h>
28 #include <folly/synchronization/CallOnce.h>
29
30 #if __has_include(<sys/timerfd.h>)
31 #include <sys/timerfd.h>
32 #endif
33
34 #if __has_include(<liburing.h>)
35
36 extern "C" FOLLY_ATTR_WEAK void eb_poll_loop_pre_hook(uint64_t* call_time);
37 extern "C" FOLLY_ATTR_WEAK void eb_poll_loop_post_hook(
38 uint64_t call_time, int ret);
39
40 namespace {
41 struct SignalRegistry {
42 struct SigInfo {
43 struct sigaction sa_ {};
44 size_t refs_{0};
45 };
46 using SignalMap = std::map<int, SigInfo>;
47
SignalRegistry__anone85a6d8a0111::SignalRegistry48 constexpr SignalRegistry() {}
49
50 void notify(int sig);
51 void setNotifyFd(int sig, int fd);
52
53 // lock protecting the signal map
54 folly::MicroSpinLock mapLock_ = {0};
55 std::unique_ptr<SignalMap> map_;
56 std::atomic<int> notifyFd_{-1};
57 };
58
getSignalRegistry()59 SignalRegistry& getSignalRegistry() {
60 static auto& sInstance = *new SignalRegistry();
61 return sInstance;
62 }
63
evSigHandler(int sig)64 void evSigHandler(int sig) {
65 getSignalRegistry().notify(sig);
66 }
67
notify(int sig)68 void SignalRegistry::notify(int sig) {
69 // use try_lock in case somebody already has the lock
70 std::unique_lock<folly::MicroSpinLock> lk(mapLock_, std::try_to_lock);
71 if (lk.owns_lock()) {
72 int fd = notifyFd_.load();
73 if (fd >= 0) {
74 uint8_t sigNum = static_cast<uint8_t>(sig);
75 ::write(fd, &sigNum, 1);
76 }
77 }
78 }
79
setNotifyFd(int sig,int fd)80 void SignalRegistry::setNotifyFd(int sig, int fd) {
81 std::lock_guard<folly::MicroSpinLock> g(mapLock_);
82 if (fd >= 0) {
83 if (!map_) {
84 map_ = std::make_unique<SignalMap>();
85 }
86 // switch the fd
87 notifyFd_.store(fd);
88
89 auto iter = (*map_).find(sig);
90 if (iter != (*map_).end()) {
91 iter->second.refs_++;
92 } else {
93 auto& entry = (*map_)[sig];
94 entry.refs_ = 1;
95 struct sigaction sa = {};
96 sa.sa_handler = evSigHandler;
97 sa.sa_flags |= SA_RESTART;
98 ::sigfillset(&sa.sa_mask);
99
100 if (::sigaction(sig, &sa, &entry.sa_) == -1) {
101 (*map_).erase(sig);
102 }
103 }
104 } else {
105 notifyFd_.store(fd);
106
107 if (map_) {
108 auto iter = (*map_).find(sig);
109 if ((iter != (*map_).end()) && (--iter->second.refs_ == 0)) {
110 auto entry = iter->second;
111 (*map_).erase(iter);
112 // just restore
113 ::sigaction(sig, &entry.sa_, nullptr);
114 }
115 }
116 }
117 }
118
119 } // namespace
120
121 namespace {
122 class SQGroupInfoRegistry {
123 private:
124 // a group is a collection of io_uring instances
125 // that share up to numThreads SQ poll threads
126 struct SQGroupInfo {
127 struct SQSubGroupInfo {
128 folly::F14FastSet<int> fds;
129 size_t count{0};
130
add__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo::SQSubGroupInfo131 void add(int fd) {
132 CHECK(fds.find(fd) == fds.end());
133 fds.insert(fd);
134 ++count;
135 }
136
remove__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo::SQSubGroupInfo137 size_t remove(int fd) {
138 auto iter = fds.find(fd);
139 CHECK(iter != fds.end());
140 fds.erase(fd);
141 --count;
142
143 return count;
144 }
145 };
146
SQGroupInfo__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo147 SQGroupInfo(size_t num, std::set<uint32_t> const& cpus) : subGroups(num) {
148 for (const uint32_t cpu : cpus) {
149 nextCpu.emplace_back(cpu);
150 }
151 }
152
153 // returns the least loaded subgroup
getNextSubgroup__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo154 SQSubGroupInfo* getNextSubgroup() {
155 size_t min_idx = 0;
156 for (size_t i = 0; i < subGroups.size(); i++) {
157 if (subGroups[i].count == 0) {
158 return &subGroups[i];
159 }
160
161 if (subGroups[i].count < subGroups[min_idx].count) {
162 min_idx = i;
163 }
164 }
165
166 return &subGroups[min_idx];
167 }
168
add__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo169 size_t add(int fd, SQSubGroupInfo* sg) {
170 CHECK(fdSgMap.find(fd) == fdSgMap.end());
171 fdSgMap.insert(std::make_pair(fd, sg));
172 sg->add(fd);
173 ++count;
174
175 return count;
176 }
177
remove__anone85a6d8a0211::SQGroupInfoRegistry::SQGroupInfo178 size_t remove(int fd) {
179 auto iter = fdSgMap.find(fd);
180 CHECK(fdSgMap.find(fd) != fdSgMap.end());
181 iter->second->remove(fd);
182 fdSgMap.erase(iter);
183 --count;
184
185 return count;
186 }
187
188 // file descriptor to sub group index map
189 folly::F14FastMap<int, SQSubGroupInfo*> fdSgMap;
190 // array of subgoups
191 std::vector<SQSubGroupInfo> subGroups;
192 // number of entries
193 size_t count{0};
194 // Set of CPUs we will bind threads to.
195 std::vector<uint32_t> nextCpu;
196 int nextCpuIndex{0};
197 };
198
199 using SQGroupInfoMap = folly::F14FastMap<std::string, SQGroupInfo>;
200 SQGroupInfoMap map_;
201 std::mutex mutex_;
202
203 public:
204 SQGroupInfoRegistry() = default;
205 ~SQGroupInfoRegistry() = default;
206
207 using FDCreateFunc = folly::Function<int(struct io_uring_params&)>;
208 using FDCloseFunc = folly::Function<void()>;
209
addTo(const std::string & groupName,size_t groupNumThreads,FDCreateFunc & createFd,struct io_uring_params & params,std::set<uint32_t> const & cpus)210 size_t addTo(
211 const std::string& groupName,
212 size_t groupNumThreads,
213 FDCreateFunc& createFd,
214 struct io_uring_params& params,
215 std::set<uint32_t> const& cpus) {
216 if (groupName.empty()) {
217 createFd(params);
218 return 0;
219 }
220
221 std::lock_guard g(mutex_);
222
223 SQGroupInfo::SQSubGroupInfo* sg = nullptr;
224 SQGroupInfo* info = nullptr;
225 auto iter = map_.find(groupName);
226 if (iter != map_.end()) {
227 info = &iter->second;
228 } else {
229 // First use of this group.
230 SQGroupInfo gr(groupNumThreads, cpus);
231 info =
232 &map_.insert(std::make_pair(groupName, std::move(gr))).first->second;
233 }
234 sg = info->getNextSubgroup();
235 if (sg->count) {
236 // we're adding to a non empty subgroup
237 params.wq_fd = *(sg->fds.begin());
238 params.flags |= IORING_SETUP_ATTACH_WQ;
239 } else {
240 // First use of this subgroup, pin thread to CPU if specified.
241 if (info->nextCpu.size()) {
242 uint32_t cpu = info->nextCpu[info->nextCpuIndex];
243 info->nextCpuIndex = (info->nextCpuIndex + 1) % info->nextCpu.size();
244
245 params.sq_thread_cpu = cpu;
246 params.flags |= IORING_SETUP_SQ_AFF;
247 }
248 }
249
250 auto fd = createFd(params);
251 if (fd < 0) {
252 return 0;
253 }
254
255 return info->add(fd, sg);
256 }
257
removeFrom(const std::string & groupName,int fd,FDCloseFunc & func)258 size_t removeFrom(const std::string& groupName, int fd, FDCloseFunc& func) {
259 if (groupName.empty()) {
260 func();
261 return 0;
262 }
263
264 size_t ret;
265
266 std::lock_guard g(mutex_);
267
268 func();
269
270 auto iter = map_.find(groupName);
271 CHECK(iter != map_.end());
272 // check for empty group
273 if ((ret = iter->second.remove(fd)) == 0) {
274 map_.erase(iter);
275 }
276
277 return ret;
278 }
279 };
280
281 static folly::Indestructible<SQGroupInfoRegistry> sSQGroupInfoRegistry;
282
283 } // namespace
284
285 namespace folly {
TimerEntry(Event * event,const struct timeval & timeout)286 IoUringBackend::TimerEntry::TimerEntry(
287 Event* event, const struct timeval& timeout)
288 : event_(event) {
289 setExpireTime(timeout, std::chrono::steady_clock::now());
290 }
291
SocketPair()292 IoUringBackend::SocketPair::SocketPair() {
293 if (::socketpair(AF_UNIX, SOCK_STREAM, 0, fds_.data())) {
294 throw std::runtime_error("socketpair error");
295 }
296
297 // set the sockets to non blocking mode
298 for (auto fd : fds_) {
299 auto flags = ::fcntl(fd, F_GETFL, 0);
300 ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
301 }
302 }
303
~SocketPair()304 IoUringBackend::SocketPair::~SocketPair() {
305 for (auto fd : fds_) {
306 if (fd >= 0) {
307 ::close(fd);
308 }
309 }
310 }
311
FdRegistry(struct io_uring & ioRing,size_t n)312 IoUringBackend::FdRegistry::FdRegistry(struct io_uring& ioRing, size_t n)
313 : ioRing_(ioRing), files_(n, -1), inUse_(n), records_(n) {}
314
init()315 int IoUringBackend::FdRegistry::init() {
316 if (inUse_) {
317 int ret = ::io_uring_register_files(&ioRing_, files_.data(), inUse_);
318
319 if (!ret) {
320 // build and set the free list head if we succeed
321 for (size_t i = 0; i < records_.size(); i++) {
322 records_[i].idx_ = i;
323 free_.push_front(records_[i]);
324 }
325 } else {
326 LOG(ERROR) << "io_uring_register_files(" << inUse_ << ") "
327 << "failed errno = " << errno << ":\""
328 << folly::errnoStr(errno) << "\" " << this;
329 }
330
331 return ret;
332 }
333
334 return 0;
335 }
336
alloc(int fd)337 IoUringBackend::FdRegistrationRecord* IoUringBackend::FdRegistry::alloc(
338 int fd) {
339 if (FOLLY_UNLIKELY(err_ || free_.empty())) {
340 return nullptr;
341 }
342
343 auto& record = free_.front();
344
345 // now we have an idx
346 int ret = ::io_uring_register_files_update(&ioRing_, record.idx_, &fd, 1);
347 if (ret != 1) {
348 // set the err_ flag so we do not retry again
349 // this usually happens when we hit the file desc limit
350 // and retrying this operation for every request is expensive
351 err_ = true;
352 LOG(ERROR) << "io_uring_register_files(1) "
353 << "failed errno = " << errno << ":\"" << folly::errnoStr(errno)
354 << "\" " << this;
355 return nullptr;
356 }
357
358 record.fd_ = fd;
359 record.count_ = 1;
360 free_.pop_front();
361
362 return &record;
363 }
364
free(IoUringBackend::FdRegistrationRecord * record)365 bool IoUringBackend::FdRegistry::free(
366 IoUringBackend::FdRegistrationRecord* record) {
367 if (record && (--record->count_ == 0)) {
368 record->fd_ = -1;
369 int ret = ::io_uring_register_files_update(
370 &ioRing_, record->idx_, &record->fd_, 1);
371
372 // we add it to the free list anyway here
373 free_.push_front(*record);
374
375 return (ret == 1);
376 }
377
378 return false;
379 }
380
IoUringBackend(Options options)381 IoUringBackend::IoUringBackend(Options options)
382 : options_(options),
383 numEntries_(options.capacity),
384 fdRegistry_(ioRing_, options.useRegisteredFds ? options.capacity : 0) {
385 // create the timer fd
386 timerFd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
387 if (timerFd_ < 0) {
388 throw std::runtime_error("timerfd_create error");
389 }
390
391 ::memset(&ioRing_, 0, sizeof(ioRing_));
392 ::memset(¶ms_, 0, sizeof(params_));
393
394 params_.flags |= IORING_SETUP_CQSIZE;
395 params_.cq_entries = options.capacity;
396
397 // poll SQ options
398 if (options.flags & Options::Flags::POLL_SQ) {
399 params_.flags |= IORING_SETUP_SQPOLL;
400 params_.sq_thread_idle = options.sqIdle.count();
401 }
402
403 SQGroupInfoRegistry::FDCreateFunc func = [&](struct io_uring_params& params) {
404 while (true) {
405 // allocate entries both for poll add and cancel
406 if (::io_uring_queue_init_params(
407 2 * options_.maxSubmit, &ioRing_, ¶ms)) {
408 options.capacity /= 2;
409 if (options.minCapacity && (options.capacity >= options.minCapacity)) {
410 LOG(INFO) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit
411 << "," << params.cq_entries << ") "
412 << "failed errno = " << errno << ":\""
413 << folly::errnoStr(errno) << "\" " << this
414 << " retrying with capacity = " << options.capacity;
415
416 params_.cq_entries = options.capacity;
417 numEntries_ = options.capacity;
418 } else {
419 LOG(ERROR) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit
420 << "," << params.cq_entries << ") "
421 << "failed errno = " << errno << ":\""
422 << folly::errnoStr(errno) << "\" " << this;
423
424 throw NotAvailable("io_uring_queue_init error");
425 }
426 } else {
427 // success - break
428 break;
429 }
430 }
431
432 return ioRing_.ring_fd;
433 };
434
435 auto ret = sSQGroupInfoRegistry->addTo(
436 options_.sqGroupName,
437 options_.sqGroupNumThreads,
438 func,
439 params_,
440 options.sqCpus);
441
442 if (!options_.sqGroupName.empty()) {
443 LOG(INFO) << "Adding to SQ poll group \"" << options_.sqGroupName
444 << "\" ret = " << ret << " fd = " << ioRing_.ring_fd;
445 }
446
447 numEntries_ *= 2;
448
449 // timer entry
450 timerEntry_ = std::make_unique<IoSqe>(this, false, true /*persist*/);
451 timerEntry_->backendCb_ = IoUringBackend::processTimerIoSqe;
452
453 // signal entry
454 signalReadEntry_ = std::make_unique<IoSqe>(this, false, true /*persist*/);
455 signalReadEntry_->backendCb_ = IoUringBackend::processSignalReadIoSqe;
456
457 // we need to call the init before adding the timer fd
458 // so we avoid a deadlock - waiting for the queue to be drained
459 if (options.useRegisteredFds) {
460 // now init the file registry
461 // if this fails, we still continue since we
462 // can run without registered fds
463 fdRegistry_.init();
464 }
465
466 // delay adding the timer and signal fds until running the loop first time
467 }
468
~IoUringBackend()469 IoUringBackend::~IoUringBackend() {
470 shuttingDown_ = true;
471
472 cleanup();
473
474 CHECK(!timerEntry_);
475 CHECK(!signalReadEntry_);
476 CHECK(freeList_.empty());
477
478 ::close(timerFd_);
479 }
480
cleanup()481 void IoUringBackend::cleanup() {
482 if (ioRing_.ring_fd > 0) {
483 // release the nonsubmitted items from the submitList
484 while (!submitList_.empty()) {
485 auto* ioSqe = &submitList_.front();
486 submitList_.pop_front();
487 releaseIoSqe(ioSqe);
488 }
489
490 // release the active events
491 while (!activeEvents_.empty()) {
492 auto* ioSqe = &activeEvents_.front();
493 activeEvents_.pop_front();
494 releaseIoSqe(ioSqe);
495 }
496
497 // wait for the outstanding events to finish
498 while (numIoSqeInUse()) {
499 struct io_uring_cqe* cqe = nullptr;
500 ::io_uring_wait_cqe(&ioRing_, &cqe);
501 if (cqe) {
502 IoSqe* sqe = reinterpret_cast<IoSqe*>(io_uring_cqe_get_data(cqe));
503 releaseIoSqe(sqe);
504 ::io_uring_cqe_seen(&ioRing_, cqe);
505 }
506 }
507
508 // free the entries
509 timerEntry_.reset();
510 signalReadEntry_.reset();
511 freeList_.clear_and_dispose([](auto _) { delete _; });
512
513 int fd = ioRing_.ring_fd;
514 SQGroupInfoRegistry::FDCloseFunc func = [&]() {
515 // exit now
516 ::io_uring_queue_exit(&ioRing_);
517 ioRing_.ring_fd = -1;
518 };
519
520 auto ret = sSQGroupInfoRegistry->removeFrom(
521 options_.sqGroupName, ioRing_.ring_fd, func);
522
523 if (!options_.sqGroupName.empty()) {
524 LOG(INFO) << "Removing from SQ poll group \"" << options_.sqGroupName
525 << "\" ret = " << ret << " fd = " << fd;
526 }
527 }
528 }
529
isAvailable()530 bool IoUringBackend::isAvailable() {
531 static bool sAvailable = true;
532
533 static folly::once_flag initFlag;
534 folly::call_once(initFlag, [&]() {
535 try {
536 Options options;
537 options.setCapacity(1024);
538 IoUringBackend backend(options);
539 } catch (const NotAvailable&) {
540 sAvailable = false;
541 }
542 });
543
544 return sAvailable;
545 }
546
addTimerFd()547 bool IoUringBackend::addTimerFd() {
548 auto* entry = allocSubmissionEntry(); // this can be nullptr
549 timerEntry_->prepPollAdd(entry, timerFd_, POLLIN, true /*registerFd*/);
550 return (1 == submitOne());
551 }
552
addSignalFds()553 bool IoUringBackend::addSignalFds() {
554 auto* entry = allocSubmissionEntry(); // this can be nullptr
555 signalReadEntry_->prepPollAdd(
556 entry, signalFds_.readFd(), POLLIN, false /*registerFd*/);
557
558 return (1 == submitOne());
559 }
560
scheduleTimeout()561 void IoUringBackend::scheduleTimeout() {
562 if (!timerChanged_) {
563 return;
564 }
565
566 // reset
567 timerChanged_ = false;
568 if (!timers_.empty()) {
569 auto delta = timers_.begin()->second[0].getRemainingTime(
570 std::chrono::steady_clock::now());
571 if (delta.count() < 1000) {
572 delta = std::chrono::microseconds(1000);
573 }
574 scheduleTimeout(delta);
575 } else {
576 scheduleTimeout(std::chrono::microseconds(0)); // disable
577 }
578
579 // we do not call addTimerFd() here
580 // since it has to be added only once, after
581 // we process a poll callback
582 }
583
scheduleTimeout(const std::chrono::microseconds & us)584 void IoUringBackend::scheduleTimeout(const std::chrono::microseconds& us) {
585 struct itimerspec val;
586 val.it_interval = {0, 0};
587 val.it_value.tv_sec =
588 std::chrono::duration_cast<std::chrono::seconds>(us).count();
589 val.it_value.tv_nsec =
590 std::chrono::duration_cast<std::chrono::nanoseconds>(us).count() %
591 1000000000LL;
592
593 CHECK_EQ(::timerfd_settime(timerFd_, 0, &val, nullptr), 0);
594 }
595
addTimerEvent(Event & event,const struct timeval * timeout)596 void IoUringBackend::addTimerEvent(
597 Event& event, const struct timeval* timeout) {
598 // first try to remove if already existing
599 auto iter1 = eventToTimers_.find(&event);
600 if (iter1 != eventToTimers_.end()) {
601 // no neeed to remove it from eventToTimers_
602 auto expireTime = iter1->second;
603 auto iter2 = timers_.find(expireTime);
604 for (auto iter = iter2->second.begin(), last = iter2->second.end();
605 iter != last;
606 ++iter) {
607 if (iter->event_ == &event) {
608 iter2->second.erase(iter);
609 break;
610 }
611 }
612
613 if (iter2->second.empty()) {
614 timers_.erase(iter2);
615 }
616 }
617
618 TimerEntry entry(&event, *timeout);
619 if (!timerChanged_) {
620 timerChanged_ =
621 timers_.empty() || (entry.expireTime_ < timers_.begin()->first);
622 }
623 timers_[entry.expireTime_].push_back(entry);
624 eventToTimers_[&event] = entry.expireTime_;
625 }
626
removeTimerEvent(Event & event)627 void IoUringBackend::removeTimerEvent(Event& event) {
628 auto iter1 = eventToTimers_.find(&event);
629 CHECK(iter1 != eventToTimers_.end());
630 auto expireTime = iter1->second;
631 eventToTimers_.erase(iter1);
632
633 auto iter2 = timers_.find(expireTime);
634 CHECK(iter2 != timers_.end());
635
636 for (auto iter = iter2->second.begin(), last = iter2->second.end();
637 iter != last;
638 ++iter) {
639 if (iter->event_ == &event) {
640 iter2->second.erase(iter);
641 break;
642 }
643 }
644
645 if (iter2->second.empty()) {
646 if (!timerChanged_) {
647 timerChanged_ = (iter2 == timers_.begin());
648 }
649 timers_.erase(iter2);
650 }
651 }
652
processTimers()653 size_t IoUringBackend::processTimers() {
654 size_t ret = 0;
655 uint64_t data = 0;
656 // this can fail with but it is OK since the fd
657 // will still be readable
658 folly::readNoInt(timerFd_, &data, sizeof(data));
659
660 auto now = std::chrono::steady_clock::now();
661 while (!timers_.empty() && (now >= timers_.begin()->first)) {
662 if (!timerChanged_) {
663 timerChanged_ = true;
664 }
665 auto vec = std::move(timers_.begin()->second);
666 timers_.erase(timers_.begin());
667 for (auto& entry : vec) {
668 ret++;
669 eventToTimers_.erase(entry.event_);
670 auto* ev = entry.event_->getEvent();
671 ev->ev_res = EV_TIMEOUT;
672 event_ref_flags(ev).get() = EVLIST_INIT;
673 (*event_ref_callback(ev))((int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
674 }
675 }
676
677 return ret;
678 }
679
addSignalEvent(Event & event)680 void IoUringBackend::addSignalEvent(Event& event) {
681 auto* ev = event.getEvent();
682 signals_[ev->ev_fd].insert(&event);
683
684 // we pass the write fd for notifications
685 getSignalRegistry().setNotifyFd(ev->ev_fd, signalFds_.writeFd());
686 }
687
removeSignalEvent(Event & event)688 void IoUringBackend::removeSignalEvent(Event& event) {
689 auto* ev = event.getEvent();
690 auto iter = signals_.find(ev->ev_fd);
691 if (iter != signals_.end()) {
692 getSignalRegistry().setNotifyFd(ev->ev_fd, -1);
693 }
694 }
695
processSignals()696 size_t IoUringBackend::processSignals() {
697 size_t ret = 0;
698 static constexpr auto kNumEntries = NSIG * 2;
699 static_assert(
700 NSIG < 256, "Use a different data type to cover all the signal values");
701 std::array<bool, NSIG> processed{};
702 std::array<uint8_t, kNumEntries> signals;
703
704 ssize_t num =
705 folly::readNoInt(signalFds_.readFd(), signals.data(), signals.size());
706 for (ssize_t i = 0; i < num; i++) {
707 int signum = static_cast<int>(signals[i]);
708 if ((signum >= 0) && (signum < static_cast<int>(processed.size())) &&
709 !processed[signum]) {
710 processed[signum] = true;
711 auto iter = signals_.find(signum);
712 if (iter != signals_.end()) {
713 auto& set = iter->second;
714 for (auto& event : set) {
715 auto* ev = event->getEvent();
716 ev->ev_res = 0;
717 event_ref_flags(ev) |= EVLIST_ACTIVE;
718 (*event_ref_callback(ev))(
719 (int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
720 event_ref_flags(ev) &= ~EVLIST_ACTIVE;
721 }
722 }
723 }
724 }
725 // add the signal fd(s) back
726 addSignalFds();
727 return ret;
728 }
729
allocIoSqe(const EventCallback & cb)730 IoUringBackend::IoSqe* IoUringBackend::allocIoSqe(const EventCallback& cb) {
731 // try to allocate from the pool first
732 if ((cb.type_ == EventCallback::Type::TYPE_NONE) && (!freeList_.empty())) {
733 auto* ret = &freeList_.front();
734 freeList_.pop_front();
735 numIoSqeInUse_++;
736 return ret;
737 }
738
739 // alloc a new IoSqe
740 auto* ret = allocNewIoSqe(cb);
741 if (FOLLY_LIKELY(!!ret)) {
742 numIoSqeInUse_++;
743 }
744
745 return ret;
746 }
747
releaseIoSqe(IoUringBackend::IoSqe * aioIoSqe)748 void IoUringBackend::releaseIoSqe(IoUringBackend::IoSqe* aioIoSqe) {
749 CHECK_GT(numIoSqeInUse_, 0);
750 aioIoSqe->cbData_.releaseData();
751 // unregister the file descriptor record
752 if (aioIoSqe->fdRecord_) {
753 unregisterFd(aioIoSqe->fdRecord_);
754 aioIoSqe->fdRecord_ = nullptr;
755 }
756
757 if (FOLLY_LIKELY(aioIoSqe->poolAlloc_)) {
758 numIoSqeInUse_--;
759 aioIoSqe->event_ = nullptr;
760 freeList_.push_front(*aioIoSqe);
761 } else {
762 if (!aioIoSqe->persist_) {
763 numIoSqeInUse_--;
764 delete aioIoSqe;
765 }
766 }
767 }
768
processPollIo(IoSqe * ioSqe,int64_t res)769 void IoUringBackend::processPollIo(IoSqe* ioSqe, int64_t res) noexcept {
770 auto* ev = ioSqe->event_ ? (ioSqe->event_->getEvent()) : nullptr;
771 if (ev) {
772 if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
773 // if this is not a persistent event
774 // remove the EVLIST_INSERTED flags
775 // and dec the numInsertedEvents_
776 if (~ev->ev_events & EV_PERSIST) {
777 DCHECK(numInsertedEvents_ > 0);
778 numInsertedEvents_--;
779 event_ref_flags(ev) &= ~EVLIST_INSERTED;
780 }
781 }
782
783 // add it to the active list
784 event_ref_flags(ev) |= EVLIST_ACTIVE;
785 ev->ev_res = res;
786 activeEvents_.push_back(*ioSqe);
787 } else {
788 releaseIoSqe(ioSqe);
789 }
790 }
791
processActiveEvents()792 size_t IoUringBackend::processActiveEvents() {
793 size_t ret = 0;
794 IoSqe* ioSqe;
795
796 while (!activeEvents_.empty() && !loopBreak_) {
797 bool release = true;
798 ioSqe = &activeEvents_.front();
799 activeEvents_.pop_front();
800 ret++;
801 auto* event = ioSqe->event_;
802 auto* ev = event ? event->getEvent() : nullptr;
803 if (ev) {
804 // remove it from the active list
805 event_ref_flags(ev) &= ~EVLIST_ACTIVE;
806 bool inserted = (event_ref_flags(ev) & EVLIST_INSERTED);
807
808 // prevent the callback from freeing the aioIoSqe
809 ioSqe->useCount_++;
810 if (!ioSqe->cbData_.processCb(ev->ev_res)) {
811 // adjust the ev_res for the poll case
812 ev->ev_res = getPollEvents(ev->ev_res, ev->ev_events);
813 // handle spurious poll events that return 0
814 // this can happen during high load on process startup
815 if (ev->ev_res) {
816 (*event_ref_callback(ev))(
817 (int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
818 }
819 }
820 // get the event again
821 event = ioSqe->event_;
822 ev = event ? event->getEvent() : nullptr;
823 if (ev && inserted && event_ref_flags(ev) & EVLIST_INSERTED &&
824 !shuttingDown_) {
825 release = false;
826 eb_event_modify_inserted(*event, ioSqe);
827 }
828 ioSqe->useCount_--;
829 } else {
830 ioSqe->processActive();
831 }
832 if (release) {
833 releaseIoSqe(ioSqe);
834 }
835 }
836
837 return ret;
838 }
839
eb_event_base_loop(int flags)840 int IoUringBackend::eb_event_base_loop(int flags) {
841 if (registerDefaultFds_) {
842 registerDefaultFds_ = false;
843 if (!addTimerFd() || !addSignalFds()) {
844 cleanup();
845 throw NotAvailable("io_uring_submit error");
846 }
847 }
848
849 // schedule the timers
850 bool done = false;
851 auto waitForEvents = (flags & EVLOOP_NONBLOCK) ? WaitForEventsMode::DONT_WAIT
852 : WaitForEventsMode::WAIT;
853 while (!done) {
854 scheduleTimeout();
855 // check if we need to break here
856 if (loopBreak_) {
857 loopBreak_ = false;
858 break;
859 }
860
861 submitList(submitList_, waitForEvents);
862
863 if (!numInsertedEvents_ && timers_.empty() && signals_.empty()) {
864 return 1;
865 }
866
867 uint64_t call_time = 0;
868 if (eb_poll_loop_pre_hook) {
869 eb_poll_loop_pre_hook(&call_time);
870 }
871
872 // do not wait for events if EVLOOP_NONBLOCK is set
873 int ret = getActiveEvents(waitForEvents);
874
875 if (eb_poll_loop_post_hook) {
876 eb_poll_loop_post_hook(call_time, ret);
877 }
878
879 size_t numProcessedTimers = 0;
880
881 // save the processTimers_
882 // this means we've received a notification
883 // and we need to add the timer fd back
884 bool processTimersFlag = processTimers_;
885 if (processTimers_ && !loopBreak_) {
886 numProcessedTimers = processTimers();
887 processTimers_ = false;
888 }
889
890 size_t numProcessedSignals = 0;
891
892 if (processSignals_ && !loopBreak_) {
893 numProcessedSignals = processSignals();
894 processSignals_ = false;
895 }
896
897 if (!activeEvents_.empty() && !loopBreak_) {
898 processActiveEvents();
899 if (flags & EVLOOP_ONCE) {
900 done = true;
901 }
902 } else if (flags & EVLOOP_NONBLOCK) {
903 if (signals_.empty()) {
904 done = true;
905 }
906 }
907
908 if (!done && (numProcessedTimers || numProcessedSignals) &&
909 (flags & EVLOOP_ONCE)) {
910 done = true;
911 }
912
913 if (processTimersFlag) {
914 addTimerFd();
915 }
916 }
917
918 return 0;
919 }
920
eb_event_base_loopbreak()921 int IoUringBackend::eb_event_base_loopbreak() {
922 loopBreak_ = true;
923
924 return 0;
925 }
926
eb_event_add(Event & event,const struct timeval * timeout)927 int IoUringBackend::eb_event_add(Event& event, const struct timeval* timeout) {
928 auto* ev = event.getEvent();
929 CHECK(ev);
930 CHECK(!(event_ref_flags(ev) & ~EVLIST_ALL));
931 // we do not support read/write timeouts
932 if (timeout) {
933 event_ref_flags(ev) |= EVLIST_TIMEOUT;
934 addTimerEvent(event, timeout);
935 return 0;
936 }
937
938 if (ev->ev_events & EV_SIGNAL) {
939 event_ref_flags(ev) |= EVLIST_INSERTED;
940 addSignalEvent(event);
941 return 0;
942 }
943
944 if ((ev->ev_events & (EV_READ | EV_WRITE)) &&
945 !(event_ref_flags(ev) & (EVLIST_INSERTED | EVLIST_ACTIVE))) {
946 auto* ioSqe = allocIoSqe(event.getCallback());
947 CHECK(ioSqe);
948 ioSqe->event_ = &event;
949
950 // just append it
951 submitList_.push_back(*ioSqe);
952 if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
953 numInsertedEvents_++;
954 }
955 event_ref_flags(ev) |= EVLIST_INSERTED;
956 event.setUserData(ioSqe);
957 }
958
959 return 0;
960 }
961
eb_event_del(Event & event)962 int IoUringBackend::eb_event_del(Event& event) {
963 if (!event.eb_ev_base()) {
964 return -1;
965 }
966
967 auto* ev = event.getEvent();
968 if (event_ref_flags(ev) & EVLIST_TIMEOUT) {
969 event_ref_flags(ev) &= ~EVLIST_TIMEOUT;
970 removeTimerEvent(event);
971 return 1;
972 }
973
974 if (!(event_ref_flags(ev) & (EVLIST_ACTIVE | EVLIST_INSERTED))) {
975 return -1;
976 }
977
978 if (ev->ev_events & EV_SIGNAL) {
979 event_ref_flags(ev) &= ~(EVLIST_INSERTED | EVLIST_ACTIVE);
980 removeSignalEvent(event);
981 return 0;
982 }
983
984 auto* ioSqe = reinterpret_cast<IoSqe*>(event.getUserData());
985 bool wasLinked = ioSqe->is_linked();
986 ioSqe->resetEvent();
987
988 // if the event is on the active list, we just clear the flags
989 // and reset the event_ ptr
990 if (event_ref_flags(ev) & EVLIST_ACTIVE) {
991 event_ref_flags(ev) &= ~EVLIST_ACTIVE;
992 }
993
994 if (event_ref_flags(ev) & EVLIST_INSERTED) {
995 event_ref_flags(ev) &= ~EVLIST_INSERTED;
996
997 // not in use - we can cancel it
998 if (!ioSqe->useCount_ && !wasLinked) {
999 // io_cancel will attempt to cancel the event. the result is
1000 // EINVAL - usually the event has already been delivered
1001 // EINPROGRESS - cancellation in progress
1002 // EFAULT - bad ctx
1003 // regardless, we want to dec the numInsertedEvents_
1004 // since even if the events get delivered, the event ptr is nullptr
1005 int ret = cancelOne(ioSqe);
1006 if (ret < 0) {
1007 // release the ioSqe
1008 releaseIoSqe(ioSqe);
1009 }
1010 } else {
1011 if (!ioSqe->useCount_) {
1012 releaseIoSqe(ioSqe);
1013 }
1014 }
1015
1016 if (~event_ref_flags(ev) & EVLIST_INTERNAL) {
1017 CHECK_GT(numInsertedEvents_, 0);
1018 numInsertedEvents_--;
1019 }
1020
1021 return 0;
1022 } else {
1023 // we can have an EVLIST_ACTIVE event
1024 // which does not have the EVLIST_INSERTED flag set
1025 // so we need to release it here
1026 releaseIoSqe(ioSqe);
1027 }
1028
1029 return -1;
1030 }
1031
eb_event_modify_inserted(Event & event,IoSqe * ioSqe)1032 int IoUringBackend::eb_event_modify_inserted(Event& event, IoSqe* ioSqe) {
1033 // unlink and append
1034 ioSqe->unlink();
1035 submitList_.push_back(*ioSqe);
1036 event.setUserData(ioSqe);
1037
1038 return 0;
1039 }
1040
submitOne()1041 int IoUringBackend::submitOne() {
1042 return submitBusyCheck(1, WaitForEventsMode::DONT_WAIT);
1043 }
1044
cancelOne(IoSqe * ioSqe)1045 int IoUringBackend::cancelOne(IoSqe* ioSqe) {
1046 auto* rentry = static_cast<IoSqe*>(allocIoSqe(EventCallback()));
1047 if (!rentry) {
1048 return 0;
1049 }
1050
1051 auto* sqe = get_sqe();
1052 CHECK(sqe);
1053
1054 rentry->prepCancel(sqe, ioSqe); // prev entry
1055
1056 int ret = submitBusyCheck(1, WaitForEventsMode::DONT_WAIT);
1057
1058 if (ret < 0) {
1059 // release the sqe
1060 releaseIoSqe(rentry);
1061 }
1062
1063 return ret;
1064 }
1065
getActiveEvents(WaitForEventsMode waitForEvents)1066 int IoUringBackend::getActiveEvents(WaitForEventsMode waitForEvents) {
1067 size_t i = 0;
1068 struct io_uring_cqe* cqe = nullptr;
1069 // we can be called from the submitList() method
1070 // or with non blocking flags
1071 if (FOLLY_LIKELY(waitForEvents == WaitForEventsMode::WAIT)) {
1072 // if polling the CQ, busy wait for one entry
1073 if (options_.flags & Options::Flags::POLL_CQ) {
1074 do {
1075 ::io_uring_peek_cqe(&ioRing_, &cqe);
1076 asm_volatile_pause();
1077 // call the loop callback if installed
1078 // we call it every time we poll for a CQE
1079 // regardless of the io_uring_peek_cqe result
1080 if (cqPollLoopCallback_) {
1081 cqPollLoopCallback_();
1082 }
1083 } while (!cqe);
1084 } else {
1085 ::io_uring_wait_cqe(&ioRing_, &cqe);
1086 }
1087 } else {
1088 ::io_uring_peek_cqe(&ioRing_, &cqe);
1089 }
1090 while (cqe && (i < options_.maxGet)) {
1091 i++;
1092 IoSqe* sqe = reinterpret_cast<IoSqe*>(io_uring_cqe_get_data(cqe));
1093 sqe->backendCb_(this, sqe, cqe->res);
1094 ::io_uring_cqe_seen(&ioRing_, cqe);
1095 cqe = nullptr;
1096 ::io_uring_peek_cqe(&ioRing_, &cqe);
1097 }
1098
1099 return static_cast<int>(i);
1100 }
1101
submitBusyCheck(int num,WaitForEventsMode waitForEvents)1102 int IoUringBackend::submitBusyCheck(int num, WaitForEventsMode waitForEvents) {
1103 int i = 0;
1104 int res;
1105 while (i < num) {
1106 if (waitForEvents == WaitForEventsMode::WAIT) {
1107 if (options_.flags & Options::Flags::POLL_CQ) {
1108 res = ::io_uring_submit(&ioRing_);
1109 } else {
1110 res = ::io_uring_submit_and_wait(&ioRing_, 1);
1111 }
1112 } else {
1113 res = ::io_uring_submit(&ioRing_);
1114 }
1115 if (res == -EBUSY) {
1116 // if we get EBUSY, try to consume some CQ entries
1117 getActiveEvents(WaitForEventsMode::DONT_WAIT);
1118 continue;
1119 }
1120 if (res < 0) {
1121 // continue if interrupted
1122 if (errno == EINTR) {
1123 continue;
1124 }
1125
1126 return res;
1127 }
1128
1129 // we do not have any other entries to submit
1130 if (res == 0) {
1131 break;
1132 }
1133
1134 i += res;
1135
1136 // if polling the CQ, busy wait for one entry
1137 if (waitForEvents == WaitForEventsMode::WAIT &&
1138 options_.flags & Options::Flags::POLL_CQ && i == num) {
1139 struct io_uring_cqe* cqe = nullptr;
1140 while (!cqe) {
1141 ::io_uring_peek_cqe(&ioRing_, &cqe);
1142 }
1143 }
1144 }
1145
1146 return num;
1147 }
1148
submitList(IoSqeList & ioSqes,WaitForEventsMode waitForEvents)1149 size_t IoUringBackend::submitList(
1150 IoSqeList& ioSqes, WaitForEventsMode waitForEvents) {
1151 int i = 0;
1152 size_t ret = 0;
1153
1154 while (!ioSqes.empty()) {
1155 auto* entry = &ioSqes.front();
1156 ioSqes.pop_front();
1157 auto* sqe = get_sqe();
1158 CHECK(sqe); // this should not happen
1159
1160 entry->processSubmit(sqe);
1161 i++;
1162
1163 if (ioSqes.empty()) {
1164 int num = submitBusyCheck(i, waitForEvents);
1165 CHECK_EQ(num, i);
1166 ret += i;
1167 } else {
1168 if (static_cast<size_t>(i) == options_.maxSubmit) {
1169 int num = submitBusyCheck(i, WaitForEventsMode::DONT_WAIT);
1170 CHECK_EQ(num, i);
1171 ret += i;
1172 i = 0;
1173 }
1174 }
1175 }
1176
1177 return ret;
1178 }
1179
queueRead(int fd,void * buf,unsigned int nbytes,off_t offset,FileOpCallback && cb)1180 void IoUringBackend::queueRead(
1181 int fd, void* buf, unsigned int nbytes, off_t offset, FileOpCallback&& cb) {
1182 struct iovec iov {
1183 buf, nbytes
1184 };
1185 auto* ioSqe = new ReadIoSqe(this, fd, &iov, offset, std::move(cb));
1186 ioSqe->backendCb_ = processFileOpCB;
1187 incNumIoSqeInUse();
1188
1189 submitImmediateIoSqe(*ioSqe);
1190 }
1191
queueWrite(int fd,const void * buf,unsigned int nbytes,off_t offset,FileOpCallback && cb)1192 void IoUringBackend::queueWrite(
1193 int fd,
1194 const void* buf,
1195 unsigned int nbytes,
1196 off_t offset,
1197 FileOpCallback&& cb) {
1198 struct iovec iov {
1199 const_cast<void*>(buf), nbytes
1200 };
1201 auto* ioSqe = new WriteIoSqe(this, fd, &iov, offset, std::move(cb));
1202 ioSqe->backendCb_ = processFileOpCB;
1203 incNumIoSqeInUse();
1204
1205 submitImmediateIoSqe(*ioSqe);
1206 }
1207
queueReadv(int fd,Range<const struct iovec * > iovecs,off_t offset,FileOpCallback && cb)1208 void IoUringBackend::queueReadv(
1209 int fd,
1210 Range<const struct iovec*> iovecs,
1211 off_t offset,
1212 FileOpCallback&& cb) {
1213 auto* ioSqe = new ReadvIoSqe(this, fd, iovecs, offset, std::move(cb));
1214 ioSqe->backendCb_ = processFileOpCB;
1215 incNumIoSqeInUse();
1216
1217 submitImmediateIoSqe(*ioSqe);
1218 }
1219
queueWritev(int fd,Range<const struct iovec * > iovecs,off_t offset,FileOpCallback && cb)1220 void IoUringBackend::queueWritev(
1221 int fd,
1222 Range<const struct iovec*> iovecs,
1223 off_t offset,
1224 FileOpCallback&& cb) {
1225 auto* ioSqe = new WritevIoSqe(this, fd, iovecs, offset, std::move(cb));
1226 ioSqe->backendCb_ = processFileOpCB;
1227 incNumIoSqeInUse();
1228
1229 submitImmediateIoSqe(*ioSqe);
1230 }
1231
queueFsync(int fd,FileOpCallback && cb)1232 void IoUringBackend::queueFsync(int fd, FileOpCallback&& cb) {
1233 queueFsync(fd, FSyncFlags::FLAGS_FSYNC, std::move(cb));
1234 }
1235
queueFdatasync(int fd,FileOpCallback && cb)1236 void IoUringBackend::queueFdatasync(int fd, FileOpCallback&& cb) {
1237 queueFsync(fd, FSyncFlags::FLAGS_FDATASYNC, std::move(cb));
1238 }
1239
queueFsync(int fd,FSyncFlags flags,FileOpCallback && cb)1240 void IoUringBackend::queueFsync(int fd, FSyncFlags flags, FileOpCallback&& cb) {
1241 auto* ioSqe = new FSyncIoSqe(this, fd, flags, std::move(cb));
1242 ioSqe->backendCb_ = processFileOpCB;
1243 incNumIoSqeInUse();
1244
1245 submitImmediateIoSqe(*ioSqe);
1246 }
1247
queueOpenat(int dfd,const char * path,int flags,mode_t mode,FileOpCallback && cb)1248 void IoUringBackend::queueOpenat(
1249 int dfd, const char* path, int flags, mode_t mode, FileOpCallback&& cb) {
1250 auto* ioSqe = new FOpenAtIoSqe(this, dfd, path, flags, mode, std::move(cb));
1251 ioSqe->backendCb_ = processFileOpCB;
1252 incNumIoSqeInUse();
1253
1254 submitImmediateIoSqe(*ioSqe);
1255 }
1256
queueOpenat2(int dfd,const char * path,struct open_how * how,FileOpCallback && cb)1257 void IoUringBackend::queueOpenat2(
1258 int dfd, const char* path, struct open_how* how, FileOpCallback&& cb) {
1259 auto* ioSqe = new FOpenAt2IoSqe(this, dfd, path, how, std::move(cb));
1260 ioSqe->backendCb_ = processFileOpCB;
1261 incNumIoSqeInUse();
1262
1263 submitImmediateIoSqe(*ioSqe);
1264 }
1265
queueClose(int fd,FileOpCallback && cb)1266 void IoUringBackend::queueClose(int fd, FileOpCallback&& cb) {
1267 auto* ioSqe = new FCloseIoSqe(this, fd, std::move(cb));
1268 ioSqe->backendCb_ = processFileOpCB;
1269 incNumIoSqeInUse();
1270
1271 submitImmediateIoSqe(*ioSqe);
1272 }
1273
queueFallocate(int fd,int mode,off_t offset,off_t len,FileOpCallback && cb)1274 void IoUringBackend::queueFallocate(
1275 int fd, int mode, off_t offset, off_t len, FileOpCallback&& cb) {
1276 auto* ioSqe = new FAllocateIoSqe(this, fd, mode, offset, len, std::move(cb));
1277 ioSqe->backendCb_ = processFileOpCB;
1278 incNumIoSqeInUse();
1279
1280 submitImmediateIoSqe(*ioSqe);
1281 }
1282
queueSendmsg(int fd,const struct msghdr * msg,unsigned int flags,FileOpCallback && cb)1283 void IoUringBackend::queueSendmsg(
1284 int fd, const struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
1285 auto* ioSqe = new SendmsgIoSqe(this, fd, msg, flags, std::move(cb));
1286 ioSqe->backendCb_ = processFileOpCB;
1287 incNumIoSqeInUse();
1288
1289 submitImmediateIoSqe(*ioSqe);
1290 }
1291
queueRecvmsg(int fd,struct msghdr * msg,unsigned int flags,FileOpCallback && cb)1292 void IoUringBackend::queueRecvmsg(
1293 int fd, struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
1294 auto* ioSqe = new RecvmsgIoSqe(this, fd, msg, flags, std::move(cb));
1295 ioSqe->backendCb_ = processFileOpCB;
1296 incNumIoSqeInUse();
1297
1298 submitImmediateIoSqe(*ioSqe);
1299 }
1300
processFileOp(IoSqe * sqe,int64_t res)1301 void IoUringBackend::processFileOp(IoSqe* sqe, int64_t res) noexcept {
1302 auto* ioSqe = reinterpret_cast<FileOpIoSqe*>(sqe);
1303 // save the res
1304 ioSqe->res_ = res;
1305 activeEvents_.push_back(*ioSqe);
1306 numInsertedEvents_--;
1307 }
1308
1309 } // namespace folly
1310
1311 #endif // __has_include(<liburing.h>)
1312