1 #include "config.h"  // IWYU pragma: keep
2 
3 #include "topic_monitor.h"
4 
5 #include <limits.h>
6 #include <unistd.h>
7 
8 #include "flog.h"
9 #include "iothread.h"
10 #include "wcstringutil.h"
11 #include "wutil.h"
12 
describe() const13 wcstring generation_list_t::describe() const {
14     wcstring result;
15     for (generation_t gen : this->as_array()) {
16         if (!result.empty()) result.push_back(L',');
17         if (gen == invalid_generation) {
18             result.append(L"-1");
19         } else {
20             result.append(to_string(gen));
21         }
22     }
23     return result;
24 }
25 
binary_semaphore_t()26 binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) {
27     // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
28     // On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304).
29     // So use fast semaphores on Linux only.
30 #ifdef __linux__
31     sem_ok_ = (0 == sem_init(&sem_, 0, 0));
32 #endif
33     if (!sem_ok_) {
34         auto pipes = make_autoclose_pipes();
35         assert(pipes.has_value() && "Failed to make pubsub pipes");
36         pipes_ = pipes.acquire();
37 
38         // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point
39         // where instrumented code makes certain blocking calls. But tsan cannot interrupt a signal
40         // call, so if we're blocked in read() (like the topic monitor wants to be!), we'll never
41         // receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as non-blocking
42         // (so reads will never block) and use select() to poll it.
43 #ifdef FISH_TSAN_WORKAROUNDS
44         DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd()));
45 #endif
46     }
47 }
48 
~binary_semaphore_t()49 binary_semaphore_t::~binary_semaphore_t() {
50     // We never use sem_t on Mac. The #ifdef avoids deprecation warnings.
51 #ifndef __APPLE__
52     if (sem_ok_) (void)sem_destroy(&sem_);
53 #endif
54 }
55 
die(const wchar_t * msg) const56 void binary_semaphore_t::die(const wchar_t *msg) const {
57     wperror(msg);
58     DIE("unexpected failure");
59 }
60 
post()61 void binary_semaphore_t::post() {
62     if (sem_ok_) {
63         int res = sem_post(&sem_);
64         // sem_post is non-interruptible.
65         if (res < 0) die(L"sem_post");
66     } else {
67         // Write exactly one byte.
68         ssize_t ret;
69         do {
70             const uint8_t v = 0;
71             ret = write(pipes_.write.fd(), &v, sizeof v);
72         } while (ret < 0 && errno == EINTR);
73         if (ret < 0) die(L"write");
74     }
75 }
76 
wait()77 void binary_semaphore_t::wait() {
78     if (sem_ok_) {
79         int res;
80         do {
81             res = sem_wait(&sem_);
82         } while (res < 0 && errno == EINTR);
83         // Other errors here are very unexpected.
84         if (res < 0) die(L"sem_wait");
85     } else {
86         int fd = pipes_.read.fd();
87         // We must read exactly one byte.
88         for (;;) {
89 #ifdef FISH_TSAN_WORKAROUNDS
90             // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
91             // call until data is available (that is, fish would use 100% cpu while waiting for
92             // processes). The select prevents that.
93             (void)select_wrapper_t::is_fd_readable(fd, select_wrapper_t::kNoTimeout);
94 #endif
95             uint8_t ignored;
96             auto amt = read(fd, &ignored, sizeof ignored);
97             if (amt == 1) break;
98             // EAGAIN should only be returned in TSan case.
99             if (amt < 0 && errno != EINTR && errno != EAGAIN) die(L"read");
100         }
101     }
102 }
103 
104 /// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
105 /// pointless at-exit handler for the dtor.
106 static topic_monitor_t *const s_principal = new topic_monitor_t();
107 
principal()108 topic_monitor_t &topic_monitor_t::principal() {
109     // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
110     // signal handler so it must not be lazily created.
111     return *s_principal;
112 }
113 
114 topic_monitor_t::topic_monitor_t() = default;
115 topic_monitor_t::~topic_monitor_t() = default;
116 
post(topic_t topic)117 void topic_monitor_t::post(topic_t topic) {
118     // Beware, we may be in a signal handler!
119     // Atomically update the pending topics.
120     const uint8_t topicbit = topic_to_bit(topic);
121 
122     // CAS in our bit, capturing the old status value.
123     status_bits_t oldstatus;
124     bool cas_success = false;
125     while (!cas_success) {
126         oldstatus = status_.load(std::memory_order_relaxed);
127         // Clear wakeup bit and set our topic bit.
128         status_bits_t newstatus = oldstatus;
129         newstatus &= ~STATUS_NEEDS_WAKEUP;
130         newstatus |= topicbit;
131         cas_success = status_.compare_exchange_weak(oldstatus, newstatus);
132     }
133     // Note that if the STATUS_NEEDS_WAKEUP bit is set, no other bits must be set.
134     assert(((oldstatus == STATUS_NEEDS_WAKEUP) == bool(oldstatus & STATUS_NEEDS_WAKEUP)) &&
135            "If STATUS_NEEDS_WAKEUP is set no other bits should be set");
136 
137     // If the bit was already set, then someone else posted to this topic and nobody has reacted to
138     // it yet. In that case we're done.
139     if (oldstatus & topicbit) {
140         return;
141     }
142 
143     // We set a new bit.
144     // Check if we should wake up a thread because it was waiting.
145     if (oldstatus & STATUS_NEEDS_WAKEUP) {
146         std::atomic_thread_fence(std::memory_order_release);
147         sema_.post();
148     }
149 }
150 
updated_gens_in_data(acquired_lock<data_t> & data)151 generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &data) {
152     // Atomically acquire the pending updates, swapping in 0.
153     // If there are no pending updates (likely) or a thread is waiting, just return.
154     // Otherwise CAS in 0 and update our topics.
155     const auto relaxed = std::memory_order_relaxed;
156     topic_bitmask_t changed_topic_bits;
157     bool cas_success;
158     do {
159         changed_topic_bits = status_.load(relaxed);
160         if (changed_topic_bits == 0 || changed_topic_bits == STATUS_NEEDS_WAKEUP)
161             return data->current;
162         cas_success = status_.compare_exchange_weak(changed_topic_bits, 0);
163     } while (!cas_success);
164     assert((changed_topic_bits & STATUS_NEEDS_WAKEUP) == 0 &&
165            "Thread waiting bit should not be set");
166 
167     // Update the current generation with our topics and return it.
168     for (topic_t topic : all_topics()) {
169         if (changed_topic_bits & topic_to_bit(topic)) {
170             data->current.at(topic) += 1;
171             FLOG(topic_monitor, "Updating topic", static_cast<int>(topic), "to",
172                  data->current.at(topic));
173         }
174     }
175     // Report our change.
176     data_notifier_.notify_all();
177     return data->current;
178 }
179 
updated_gens()180 generation_list_t topic_monitor_t::updated_gens() {
181     auto data = data_.acquire();
182     return updated_gens_in_data(data);
183 }
184 
try_update_gens_maybe_becoming_reader(generation_list_t * gens)185 bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) {
186     bool become_reader = false;
187     auto data = data_.acquire();
188     for (;;) {
189         // See if the updated gen list has changed. If so we don't need to become the reader.
190         auto current = updated_gens_in_data(data);
191         FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current",
192              current.describe());
193         if (*gens != current) {
194             *gens = current;
195             break;
196         }
197 
198         // The generations haven't changed. Perhaps we become the reader.
199         // Note we still hold the lock, so this cannot race with any other thread becoming the
200         // reader.
201         if (data->has_reader) {
202             // We already have a reader, wait for it to notify us and loop again.
203             data_notifier_.wait(data.get_lock());
204             continue;
205         } else {
206             // We will try to become the reader.
207             // Reader bit should not be set in this case.
208             assert((status_.load() & STATUS_NEEDS_WAKEUP) == 0 && "No thread should be waiting");
209             // Try becoming the reader by marking the reader bit.
210             status_bits_t expected_old = 0;
211             if (!status_.compare_exchange_strong(expected_old, STATUS_NEEDS_WAKEUP)) {
212                 // We failed to become the reader, perhaps because another topic post just arrived.
213                 // Loop again.
214                 continue;
215             }
216             // We successfully did a CAS from 0 -> STATUS_NEEDS_WAKEUP.
217             // Now any successive topic post must signal us.
218             FLOG(topic_monitor, "TID", thread_id(), "becoming reader");
219             become_reader = true;
220             data->has_reader = true;
221             break;
222         }
223     }
224     return become_reader;
225 }
226 
await_gens(const generation_list_t & input_gens)227 generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) {
228     generation_list_t gens = input_gens;
229     while (gens == input_gens) {
230         bool become_reader = try_update_gens_maybe_becoming_reader(&gens);
231         if (become_reader) {
232             // Now we are the reader. Read from the pipe, and then update with any changes.
233             // Note we no longer hold the lock.
234             assert(gens == input_gens &&
235                    "Generations should not have changed if we are the reader.");
236 
237             // Wait to be woken up.
238             sema_.wait();
239 
240             // We are finished waiting. We must stop being the reader, and post on the condition
241             // variable to wake up any other threads waiting for us to finish reading.
242             auto data = data_.acquire();
243             gens = data->current;
244             FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(),
245                  "read() complete, current is", gens.describe());
246             assert(data->has_reader && "We should be the reader");
247             data->has_reader = false;
248             data_notifier_.notify_all();
249         }
250     }
251     return gens;
252 }
253 
check(generation_list_t * gens,bool wait)254 bool topic_monitor_t::check(generation_list_t *gens, bool wait) {
255     if (!gens->any_valid()) return false;
256 
257     generation_list_t current = updated_gens();
258     bool changed = false;
259     for (;;) {
260         // Load the topic list and see if anything has changed.
261         for (topic_t topic : all_topics()) {
262             if (gens->is_valid(topic)) {
263                 assert(gens->at(topic) <= current.at(topic) &&
264                        "Incoming gen count exceeded published count");
265                 if (gens->at(topic) < current.at(topic)) {
266                     gens->at(topic) = current.at(topic);
267                     changed = true;
268                 }
269             }
270         }
271 
272         // If we're not waiting, or something changed, then we're done.
273         if (!wait || changed) {
274             break;
275         }
276 
277         // Wait until our gens change.
278         current = await_gens(current);
279     }
280     return changed;
281 }
282