1 #include "config.h" // IWYU pragma: keep
2
3 #include "topic_monitor.h"
4
5 #include <limits.h>
6 #include <unistd.h>
7
8 #include "flog.h"
9 #include "iothread.h"
10 #include "wcstringutil.h"
11 #include "wutil.h"
12
describe() const13 wcstring generation_list_t::describe() const {
14 wcstring result;
15 for (generation_t gen : this->as_array()) {
16 if (!result.empty()) result.push_back(L',');
17 if (gen == invalid_generation) {
18 result.append(L"-1");
19 } else {
20 result.append(to_string(gen));
21 }
22 }
23 return result;
24 }
25
binary_semaphore_t()26 binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) {
27 // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
28 // On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304).
29 // So use fast semaphores on Linux only.
30 #ifdef __linux__
31 sem_ok_ = (0 == sem_init(&sem_, 0, 0));
32 #endif
33 if (!sem_ok_) {
34 auto pipes = make_autoclose_pipes();
35 assert(pipes.has_value() && "Failed to make pubsub pipes");
36 pipes_ = pipes.acquire();
37
38 // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point
39 // where instrumented code makes certain blocking calls. But tsan cannot interrupt a signal
40 // call, so if we're blocked in read() (like the topic monitor wants to be!), we'll never
41 // receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as non-blocking
42 // (so reads will never block) and use select() to poll it.
43 #ifdef FISH_TSAN_WORKAROUNDS
44 DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd()));
45 #endif
46 }
47 }
48
~binary_semaphore_t()49 binary_semaphore_t::~binary_semaphore_t() {
50 // We never use sem_t on Mac. The #ifdef avoids deprecation warnings.
51 #ifndef __APPLE__
52 if (sem_ok_) (void)sem_destroy(&sem_);
53 #endif
54 }
55
die(const wchar_t * msg) const56 void binary_semaphore_t::die(const wchar_t *msg) const {
57 wperror(msg);
58 DIE("unexpected failure");
59 }
60
post()61 void binary_semaphore_t::post() {
62 if (sem_ok_) {
63 int res = sem_post(&sem_);
64 // sem_post is non-interruptible.
65 if (res < 0) die(L"sem_post");
66 } else {
67 // Write exactly one byte.
68 ssize_t ret;
69 do {
70 const uint8_t v = 0;
71 ret = write(pipes_.write.fd(), &v, sizeof v);
72 } while (ret < 0 && errno == EINTR);
73 if (ret < 0) die(L"write");
74 }
75 }
76
wait()77 void binary_semaphore_t::wait() {
78 if (sem_ok_) {
79 int res;
80 do {
81 res = sem_wait(&sem_);
82 } while (res < 0 && errno == EINTR);
83 // Other errors here are very unexpected.
84 if (res < 0) die(L"sem_wait");
85 } else {
86 int fd = pipes_.read.fd();
87 // We must read exactly one byte.
88 for (;;) {
89 #ifdef FISH_TSAN_WORKAROUNDS
90 // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
91 // call until data is available (that is, fish would use 100% cpu while waiting for
92 // processes). The select prevents that.
93 (void)select_wrapper_t::is_fd_readable(fd, select_wrapper_t::kNoTimeout);
94 #endif
95 uint8_t ignored;
96 auto amt = read(fd, &ignored, sizeof ignored);
97 if (amt == 1) break;
98 // EAGAIN should only be returned in TSan case.
99 if (amt < 0 && errno != EINTR && errno != EAGAIN) die(L"read");
100 }
101 }
102 }
103
104 /// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
105 /// pointless at-exit handler for the dtor.
106 static topic_monitor_t *const s_principal = new topic_monitor_t();
107
principal()108 topic_monitor_t &topic_monitor_t::principal() {
109 // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
110 // signal handler so it must not be lazily created.
111 return *s_principal;
112 }
113
114 topic_monitor_t::topic_monitor_t() = default;
115 topic_monitor_t::~topic_monitor_t() = default;
116
post(topic_t topic)117 void topic_monitor_t::post(topic_t topic) {
118 // Beware, we may be in a signal handler!
119 // Atomically update the pending topics.
120 const uint8_t topicbit = topic_to_bit(topic);
121
122 // CAS in our bit, capturing the old status value.
123 status_bits_t oldstatus;
124 bool cas_success = false;
125 while (!cas_success) {
126 oldstatus = status_.load(std::memory_order_relaxed);
127 // Clear wakeup bit and set our topic bit.
128 status_bits_t newstatus = oldstatus;
129 newstatus &= ~STATUS_NEEDS_WAKEUP;
130 newstatus |= topicbit;
131 cas_success = status_.compare_exchange_weak(oldstatus, newstatus);
132 }
133 // Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set.
134 assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) &&
135 "If STATUS_NEEDS_WAKEUP is set no other bits should be set");
136
137 // If the bit was already set, then someone else posted to this topic and nobody has reacted to
138 // it yet. In that case we're done.
139 if (oldstatus & topicbit) {
140 return;
141 }
142
143 // We set a new bit.
144 // Check if we should wake up a thread because it was waiting.
145 if (oldstatus & STATUS_NEEDS_WAKEUP) {
146 std::atomic_thread_fence(std::memory_order_release);
147 sema_.post();
148 }
149 }
150
updated_gens_in_data(acquired_lock<data_t> & data)151 generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &data) {
152 // Atomically acquire the pending updates, swapping in 0.
153 // If there are no pending updates (likely) or a thread is waiting, just return.
154 // Otherwise CAS in 0 and update our topics.
155 const auto relaxed = std::memory_order_relaxed;
156 topic_bitmask_t changed_topic_bits;
157 bool cas_success;
158 do {
159 changed_topic_bits = status_.load(relaxed);
160 if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP)
161 return data->current;
162 cas_success = status_.compare_exchange_weak(changed_topic_bits, 0);
163 } while (!cas_success);
164 assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 &&
165 "Thread waiting bit should not be set");
166
167 // Update the current generation with our topics and return it.
168 for (topic_t topic : all_topics()) {
169 if (changed_topic_bits & topic_to_bit(topic)) {
170 data->current.at(topic) += 1;
171 FLOG(topic_monitor, "Updating topic", static_cast<int>(topic), "to",
172 data->current.at(topic));
173 }
174 }
175 // Report our change.
176 data_notifier_.notify_all();
177 return data->current;
178 }
179
updated_gens()180 generation_list_t topic_monitor_t::updated_gens() {
181 auto data = data_.acquire();
182 return updated_gens_in_data(data);
183 }
184
try_update_gens_maybe_becoming_reader(generation_list_t * gens)185 bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) {
186 bool become_reader = false;
187 auto data = data_.acquire();
188 for (;;) {
189 // See if the updated gen list has changed. If so we don't need to become the reader.
190 auto current = updated_gens_in_data(data);
191 FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current",
192 current.describe());
193 if (*gens != current) {
194 *gens = current;
195 break;
196 }
197
198 // The generations haven't changed. Perhaps we become the reader.
199 // Note we still hold the lock, so this cannot race with any other thread becoming the
200 // reader.
201 if (data->has_reader) {
202 // We already have a reader, wait for it to notify us and loop again.
203 data_notifier_.wait(data.get_lock());
204 continue;
205 } else {
206 // We will try to become the reader.
207 // Reader bit should not be set in this case.
208 assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting");
209 // Try becoming the reader by marking the reader bit.
210 status_bits_t expected_old = 0;
211 if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) {
212 // We failed to become the reader, perhaps because another topic post just arrived.
213 // Loop again.
214 continue;
215 }
216 // We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP.
217 // Now any successive topic post must signal us.
218 FLOG(topic_monitor, "TID", thread_id(), "becoming reader");
219 become_reader = true;
220 data->has_reader = true;
221 break;
222 }
223 }
224 return become_reader;
225 }
226
await_gens(const generation_list_t & input_gens)227 generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) {
228 generation_list_t gens = input_gens;
229 while (gens == input_gens) {
230 bool become_reader = try_update_gens_maybe_becoming_reader(&gens);
231 if (become_reader) {
232 // Now we are the reader. Read from the pipe, and then update with any changes.
233 // Note we no longer hold the lock.
234 assert(gens == input_gens &&
235 "Generations should not have changed if we are the reader.");
236
237 // Wait to be woken up.
238 sema_.wait();
239
240 // We are finished waiting. We must stop being the reader, and post on the condition
241 // variable to wake up any other threads waiting for us to finish reading.
242 auto data = data_.acquire();
243 gens = data->current;
244 FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(),
245 "read() complete, current is", gens.describe());
246 assert(data->has_reader && "We should be the reader");
247 data->has_reader = false;
248 data_notifier_.notify_all();
249 }
250 }
251 return gens;
252 }
253
check(generation_list_t * gens,bool wait)254 bool topic_monitor_t::check(generation_list_t *gens, bool wait) {
255 if (!gens->any_valid()) return false;
256
257 generation_list_t current = updated_gens();
258 bool changed = false;
259 for (;;) {
260 // Load the topic list and see if anything has changed.
261 for (topic_t topic : all_topics()) {
262 if (gens->is_valid(topic)) {
263 assert(gens->at(topic) <= current.at(topic) &&
264 "Incoming gen count exceeded published count");
265 if (gens->at(topic) < current.at(topic)) {
266 gens->at(topic) = current.at(topic);
267 changed = true;
268 }
269 }
270 }
271
272 // If we're not waiting, or something changed, then we're done.
273 if (!wait || changed) {
274 break;
275 }
276
277 // Wait until our gens change.
278 current = await_gens(current);
279 }
280 return changed;
281 }
282