1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/utils/port/detail/KQueue.h"
8
9 char disable_linker_warning_about_empty_file_kqueue_cpp TD_UNUSED;
10
11 #ifdef TD_POLL_KQUEUE
12
13 #include "td/utils/logging.h"
14 #include "td/utils/Status.h"
15
16 #include <cerrno>
17 #include <utility>
18
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 namespace td {
23 namespace detail {
24
~KQueue()25 KQueue::~KQueue() {
26 clear();
27 }
init()28 void KQueue::init() {
29 kq_ = NativeFd(kqueue());
30 auto kqueue_errno = errno;
31 LOG_IF(FATAL, !kq_) << Status::PosixError(kqueue_errno, "kqueue creation failed");
32
33 // TODO: const
34 events_.resize(1000);
35 changes_n_ = 0;
36 }
37
clear()38 void KQueue::clear() {
39 if (!kq_) {
40 return;
41 }
42 events_.clear();
43 kq_.close();
44 for (auto *list_node = list_root_.next; list_node != &list_root_;) {
45 auto pollable_fd = PollableFd::from_list_node(list_node);
46 list_node = list_node->next;
47 }
48 }
49
update(int nevents,const timespec * timeout,bool may_fail)50 int KQueue::update(int nevents, const timespec *timeout, bool may_fail) {
51 int err = kevent(kq_.fd(), &events_[0], changes_n_, &events_[0], nevents, timeout);
52 auto kevent_errno = errno;
53
54 bool is_fatal_error = [&] {
55 if (err != -1) {
56 return false;
57 }
58 if (may_fail && kevent_errno == ENOENT) {
59 return false;
60 }
61 return kevent_errno != EINTR;
62 }();
63 if (is_fatal_error) {
64 LOG(FATAL) << Status::PosixError(kevent_errno, "kevent failed");
65 }
66
67 changes_n_ = 0;
68 if (err < 0) {
69 return 0;
70 }
71 return err;
72 }
73
flush_changes(bool may_fail)74 void KQueue::flush_changes(bool may_fail) {
75 if (!changes_n_) {
76 return;
77 }
78 int n = update(0, nullptr, may_fail);
79 CHECK(n == 0);
80 }
81
add_change(std::uintptr_t ident,int16 filter,uint16 flags,uint32 fflags,std::intptr_t data,void * udata)82 void KQueue::add_change(std::uintptr_t ident, int16 filter, uint16 flags, uint32 fflags, std::intptr_t data,
83 void *udata) {
84 if (changes_n_ == static_cast<int>(events_.size())) {
85 flush_changes();
86 }
87 #if TD_NETBSD
88 auto set_udata = reinterpret_cast<std::intptr_t>(udata);
89 #else
90 auto set_udata = udata;
91 #endif
92 EV_SET(&events_[changes_n_], ident, filter, flags, fflags, data, set_udata);
93 VLOG(fd) << "Subscribe [fd:" << ident << "] [filter:" << filter << "] [udata: " << udata << "]";
94 changes_n_++;
95 }
96
subscribe(PollableFd fd,PollFlags flags)97 void KQueue::subscribe(PollableFd fd, PollFlags flags) {
98 auto native_fd = fd.native_fd().fd();
99 auto list_node = fd.release_as_list_node();
100 list_root_.put(list_node);
101 if (flags.can_read()) {
102 add_change(native_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, list_node);
103 }
104 if (flags.can_write()) {
105 add_change(native_fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, list_node);
106 }
107 }
108
invalidate(int native_fd)109 void KQueue::invalidate(int native_fd) {
110 for (int i = 0; i < changes_n_; i++) {
111 if (events_[i].ident == static_cast<std::uintptr_t>(native_fd)) {
112 changes_n_--;
113 std::swap(events_[i], events_[changes_n_]);
114 i--;
115 }
116 }
117 }
118
unsubscribe(PollableFdRef fd_ref)119 void KQueue::unsubscribe(PollableFdRef fd_ref) {
120 auto pollable_fd = fd_ref.lock();
121 auto native_fd = pollable_fd.native_fd().fd();
122
123 // invalidate(fd);
124 flush_changes();
125 add_change(native_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
126 flush_changes(true);
127 add_change(native_fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
128 flush_changes(true);
129 }
130
unsubscribe_before_close(PollableFdRef fd_ref)131 void KQueue::unsubscribe_before_close(PollableFdRef fd_ref) {
132 auto pollable_fd = fd_ref.lock();
133 invalidate(pollable_fd.native_fd().fd());
134
135 // just to avoid O(changes_n ^ 2)
136 if (changes_n_ != 0) {
137 flush_changes();
138 }
139 }
140
run(int timeout_ms)141 void KQueue::run(int timeout_ms) {
142 timespec timeout_data;
143 timespec *timeout_ptr;
144 if (timeout_ms == -1) {
145 timeout_ptr = nullptr;
146 } else {
147 timeout_data.tv_sec = timeout_ms / 1000;
148 timeout_data.tv_nsec = timeout_ms % 1000 * 1000000;
149 timeout_ptr = &timeout_data;
150 }
151
152 int n = update(static_cast<int>(events_.size()), timeout_ptr);
153 for (int i = 0; i < n; i++) {
154 struct kevent *event = &events_[i];
155 PollFlags flags;
156 if (event->filter == EVFILT_WRITE) {
157 flags.add_flags(PollFlags::Write());
158 }
159 if (event->filter == EVFILT_READ) {
160 flags.add_flags(PollFlags::Read());
161 }
162 if (event->flags & EV_EOF) {
163 flags.add_flags(PollFlags::Close());
164 }
165 if (event->fflags & EV_ERROR) {
166 LOG(FATAL) << "EV_ERROR in kqueue is not supported";
167 }
168 #if TD_NETBSD
169 auto udata = reinterpret_cast<void *>(event->udata);
170 #else
171 auto udata = event->udata;
172 #endif
173 VLOG(fd) << "Event [fd:" << event->ident << "] [filter:" << event->filter << "] [udata: " << udata << "]";
174 // LOG(WARNING) << "Have event->ident = " << event->ident << "event->filter = " << event->filter;
175 auto pollable_fd = PollableFd::from_list_node(static_cast<ListNode *>(udata));
176 pollable_fd.add_flags(flags);
177 pollable_fd.release_as_list_node();
178 }
179 }
180 } // namespace detail
181 } // namespace td
182
183 #endif
184