1 #ifndef FISH_TOPIC_MONITOR_H
2 #define FISH_TOPIC_MONITOR_H
3 
4 #include <semaphore.h>
5 
6 #include <array>
7 #include <atomic>
8 #include <bitset>
9 #include <condition_variable>
10 #include <limits>
11 #include <numeric>
12 
13 #include "common.h"
14 #include "io.h"
15 
16 /** Topic monitoring support. Topics are conceptually "a thing that can happen." For example,
17  delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means
18  that that thing happened.
19 
20  Associated with each topic is a current generation, which is a 64 bit value. When you query a
21  topic, you get back a generation. If on the next query the generation has increased, then it
22  indicates someone posted to the topic.
23 
24  For example, if you are monitoring a child process, you can query the sigchld topic. If it has
25  increased since your last query, it is possible that your child process has exited.
26 
27  Topic postings may be coalesced. That is there may be two posts to a given topic, yet the
28  generation only increases by 1. The only guarantee is that after a topic post, the current
29  generation value is larger than any value previously queried.
30 
31  Tying this all together is the topic_monitor_t. This provides the current topic generations, and
32  also provides the ability to perform a blocking wait for any topic to change in a particular topic
33  set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit.
34  */
35 
36 /// A generation is a counter incremented every time the value of a topic changes.
37 /// It is 64 bit so it will never wrap.
38 using generation_t = uint64_t;
39 
40 /// A generation value which indicates the topic is not of interest.
41 constexpr generation_t invalid_generation = std::numeric_limits<generation_t>::max();
42 
43 /// The list of topics which may be observed.
44 enum class topic_t : uint8_t {
45     sighupint,      // Corresponds to both SIGHUP and SIGINT signals.
46     sigchld,        // Corresponds to SIGCHLD signal.
47     internal_exit,  // Corresponds to an internal process exit.
48 };
49 
50 /// Helper to return all topics, allowing easy iteration.
all_topics()51 inline std::array<topic_t, 3> all_topics() {
52     return {{topic_t::sighupint, topic_t::sigchld, topic_t::internal_exit}};
53 }
54 
55 /// Simple value type containing the values for a topic.
56 /// This should be kept in sync with topic_t.
57 class generation_list_t {
58    public:
59     generation_list_t() = default;
60 
61     generation_t sighupint{0};
62     generation_t sigchld{0};
63     generation_t internal_exit{0};
64 
65     /// \return the value for a topic.
at(topic_t topic)66     generation_t &at(topic_t topic) {
67         switch (topic) {
68             case topic_t::sigchld:
69                 return sigchld;
70             case topic_t::sighupint:
71                 return sighupint;
72             case topic_t::internal_exit:
73                 return internal_exit;
74         }
75         DIE("Unreachable");
76     }
77 
at(topic_t topic)78     generation_t at(topic_t topic) const {
79         switch (topic) {
80             case topic_t::sighupint:
81                 return sighupint;
82             case topic_t::sigchld:
83                 return sigchld;
84             case topic_t::internal_exit:
85                 return internal_exit;
86         }
87         DIE("Unreachable");
88     }
89 
90     /// \return ourselves as an array.
as_array()91     std::array<generation_t, 3> as_array() const { return {{sighupint, sigchld, internal_exit}}; }
92 
93     /// Set the value of \p topic to the smaller of our value and the value in \p other.
set_min_from(topic_t topic,const generation_list_t & other)94     void set_min_from(topic_t topic, const generation_list_t &other) {
95         if (this->at(topic) > other.at(topic)) {
96             this->at(topic) = other.at(topic);
97         }
98     }
99 
100     /// \return whether a topic is valid.
is_valid(topic_t topic)101     bool is_valid(topic_t topic) const { return this->at(topic) != invalid_generation; }
102 
103     /// \return whether any topic is valid.
any_valid()104     bool any_valid() const {
105         bool valid = false;
106         for (auto gen : as_array()) {
107             if (gen != invalid_generation) valid = true;
108         }
109         return valid;
110     }
111 
112     bool operator==(const generation_list_t &rhs) const {
113         return sighupint == rhs.sighupint && sigchld == rhs.sigchld &&
114                internal_exit == rhs.internal_exit;
115     }
116 
117     bool operator!=(const generation_list_t &rhs) const { return !(*this == rhs); }
118 
119     /// return a string representation for debugging.
120     wcstring describe() const;
121 
122     /// Generation list containing invalid generations only.
invalids()123     static generation_list_t invalids() {
124         return generation_list_t(invalid_generation, invalid_generation, invalid_generation);
125     }
126 
127    private:
generation_list_t(generation_t sighupint,generation_t sigchld,generation_t internal_exit)128     generation_list_t(generation_t sighupint, generation_t sigchld, generation_t internal_exit)
129         : sighupint(sighupint), sigchld(sigchld), internal_exit(internal_exit) {}
130 };
131 
132 /// A simple binary semaphore.
133 /// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of
134 /// a self-pipe. Note that post() must be async-signal safe.
135 class binary_semaphore_t {
136    public:
137     binary_semaphore_t();
138     ~binary_semaphore_t();
139 
140     /// Release a waiting thread.
141     void post();
142 
143     /// Wait for a post.
144     /// This loops on EINTR.
145     void wait();
146 
147    private:
148     // Print a message and exit.
149     void die(const wchar_t *msg) const;
150 
151     // Whether our semaphore was successfully initialized.
152     bool sem_ok_{};
153 
154     // The semaphore, if initalized.
155     sem_t sem_{};
156 
157     // Pipes used to emulate a semaphore, if not initialized.
158     autoclose_pipes_t pipes_{};
159 };
160 
161 /// The topic monitor class. This permits querying the current generation values for topics,
162 /// optionally blocking until they increase.
163 /// What we would like to write is that we have a set of topics, and threads wait for changes on a
164 /// condition variable which is tickled in post(). But this can't work because post() may be called
165 /// from a signal handler and condition variables are not async-signal safe.
166 /// So instead the signal handler announces changes via a binary semaphore.
167 /// In the wait case, what generally happens is:
168 ///   A thread fetches the generations, see they have not changed, and then decides to try to wait.
169 ///   It does so by atomically swapping in STATUS_NEEDS_WAKEUP to the status bits.
170 ///   If that succeeds, it waits on the binary semaphore. The post() call will then wake the thread
171 ///   up. If if failed, then either a post() call updated the status values (so perhaps there is a
172 ///   new topic post) or some other thread won the race and called wait() on the semaphore. Here our
173 ///   thread will wait on the data_notifier_ queue.
174 class topic_monitor_t {
175    private:
176     using topic_bitmask_t = uint8_t;
177 
178     // Some stuff that needs to be protected by the same lock.
179     struct data_t {
180         /// The current values.
181         generation_list_t current{};
182 
183         /// A flag indicating that there is a current reader.
184         /// The 'reader' is responsible for calling sema_.wait().
185         bool has_reader{false};
186     };
187     owning_lock<data_t> data_{};
188 
189     /// Condition variable for broadcasting notifications.
190     /// This is associated with data_'s mutex.
191     std::condition_variable data_notifier_{};
192 
193     /// A status value which describes our current state, managed via atomics.
194     /// Three possibilities:
195     ///    0:   no changed topics, no thread is waiting.
196     ///    128: no changed topics, some thread is waiting and needs wakeup.
197     ///    anything else: some changed topic, no thread is waiting.
198     ///  Note that if the msb is set (status == 128) no other bit may be set.
199     using status_bits_t = uint8_t;
200     std::atomic<uint8_t> status_{};
201 
202     /// Sentinel status value indicating that a thread is waiting and needs a wakeup.
203     /// Note it is an error for this bit to be set and also any topic bit.
204     static constexpr uint8_t STATUS_NEEDS_WAKEUP = 128;
205 
206     /// Binary semaphore used to communicate changes.
207     /// If status_ is STATUS_NEEDS_WAKEUP, then a thread has commited to call wait() on our sema and
208     /// this must be balanced by the next call to post(). Note only one thread may wait at a time.
209     binary_semaphore_t sema_{};
210 
211     /// Apply any pending updates to the data.
212     /// This accepts data because it must be locked.
213     /// \return the updated generation list.
214     generation_list_t updated_gens_in_data(acquired_lock<data_t> &data);
215 
216     /// Given a list of input generations, attempt to update them to something newer.
217     /// If \p gens is older, then just return those by reference, and directly return false (not
218     /// becoming the reader).
219     /// If \p gens is current and there is not a reader, then do not update \p gens and return true,
220     /// indicating we should become the reader. Now it is our responsibility to wait on the
221     /// semaphore and notify on a change via the condition variable. If \p gens is current, and
222     /// there is already a reader, then wait until the reader notifies us and try again.
223     bool try_update_gens_maybe_becoming_reader(generation_list_t *gens);
224 
225     /// Wait for some entry in the list of generations to change.
226     /// \return the new gens.
227     generation_list_t await_gens(const generation_list_t &input_gens);
228 
229     /// \return the current generation list, opportunistically applying any pending updates.
230     generation_list_t updated_gens();
231 
232     /// Helper to convert a topic to a bitmask containing just that topic.
topic_to_bit(topic_t t)233     static topic_bitmask_t topic_to_bit(topic_t t) { return 1 << static_cast<topic_bitmask_t>(t); }
234 
235    public:
236     topic_monitor_t();
237     ~topic_monitor_t();
238 
239     /// topic_monitors should not be copied, and there should be no reason to move one.
240     void operator=(const topic_monitor_t &) = delete;
241     topic_monitor_t(const topic_monitor_t &) = delete;
242     void operator=(topic_monitor_t &&) = delete;
243     topic_monitor_t(topic_monitor_t &&) = delete;
244 
245     /// The principal topic_monitor. This may be fetched from a signal handler.
246     static topic_monitor_t &principal();
247 
248     /// Post to a topic, potentially from a signal handler.
249     void post(topic_t topic);
250 
251     /// Access the current generations.
current_generations()252     generation_list_t current_generations() { return updated_gens(); }
253 
254     /// Access the generation for a topic.
generation_for_topic(topic_t topic)255     generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); }
256 
257     /// For each valid topic in \p gens, check to see if the current topic is larger than
258     /// the value in \p gens.
259     /// If \p wait is set, then wait if there are no changes; otherwise return immediately.
260     /// \return true if some topic changed, false if none did.
261     /// On a true return, this updates the generation list \p gens.
262     bool check(generation_list_t *gens, bool wait);
263 };
264 
265 #endif
266