1 /* Copyright 2012-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3
4 #include "watchman.h"
5 #include "InMemoryView.h"
6
7 namespace watchman {
8
waitUntilReadyToQuery(const std::shared_ptr<w_root_t> & root)9 std::shared_future<void> InMemoryView::waitUntilReadyToQuery(
10 const std::shared_ptr<w_root_t>& root) {
11 auto lockPair = acquireLockedPair(root->recrawlInfo, crawlState_);
12
13 if (lockPair.second->promise && lockPair.second->future.valid()) {
14 return lockPair.second->future;
15 }
16
17 if (root->inner.done_initial && !lockPair.first->shouldRecrawl) {
18 // Return an already satisfied future
19 std::promise<void> p;
20 p.set_value();
21 return p.get_future();
22 }
23
24 // Not yet done, so queue up the promise
25 lockPair.second->promise = watchman::make_unique<std::promise<void>>();
26 lockPair.second->future =
27 std::shared_future<void>(lockPair.second->promise->get_future());
28 return lockPair.second->future;
29 }
30
fullCrawl(const std::shared_ptr<w_root_t> & root,PendingCollection::LockedPtr & pending)31 void InMemoryView::fullCrawl(
32 const std::shared_ptr<w_root_t>& root,
33 PendingCollection::LockedPtr& pending) {
34 struct timeval start;
35
36 w_perf_t sample("full-crawl");
37 if (config_.getBool("iothrottle", false)) {
38 w_ioprio_set_low();
39 }
40 {
41 auto view = view_.wlock();
42 // Ensure that we observe these files with a new, distinct clock,
43 // otherwise a fresh subscription established immediately after a watch
44 // can get stuck with an empty view until another change is observed
45 view->mostRecentTick++;
46 gettimeofday(&start, NULL);
47 pending_.wlock()->add(root->root_path, start, W_PENDING_RECURSIVE);
48 // There is the potential for a subtle race condition here. The boolean
49 // parameter indicates whether we want to merge in the set of
50 // notifications pending from the watcher or not. Since we now coalesce
51 // overlaps we must consume our outstanding set before we merge in any
52 // new kernel notification information or we risk missing out on
53 // observing changes that happen during the initial crawl. This
54 // translates to a two level loop; the outer loop sweeps in data from
55 // inotify, then the inner loop processes it and any dirs that we pick up
56 // from recursive processing.
57 while (processPending(root, view, pending, true)) {
58 while (processPending(root, view, pending, false)) {
59 ;
60 }
61 }
62 {
63 auto lockPair = acquireLockedPair(root->recrawlInfo, crawlState_);
64 lockPair.first->shouldRecrawl = false;
65 if (lockPair.second->promise) {
66 lockPair.second->promise->set_value();
67 lockPair.second->promise.reset();
68 }
69 root->inner.done_initial = true;
70 }
71 root->cookies.abortAllCookies();
72 }
73 sample.add_root_meta(root);
74
75 if (config_.getBool("iothrottle", false)) {
76 w_ioprio_set_normal();
77 }
78
79 sample.finish();
80 sample.force_log();
81 sample.log();
82
83 w_log(
84 W_LOG_ERR,
85 "%scrawl complete\n",
86 root->recrawlInfo.rlock()->recrawlCount ? "re" : "");
87 }
88
89 // Performs settle-time actions.
90 // Returns true if the root was reaped and the io thread should terminate.
do_settle_things(const std::shared_ptr<w_root_t> & root)91 static bool do_settle_things(const std::shared_ptr<w_root_t>& root) {
92 // No new pending items were given to us, so consider that
93 // we may now be settled.
94
95 root->processPendingSymlinkTargets();
96
97 if (!root->inner.done_initial) {
98 // we need to recrawl, stop what we're doing here
99 return false;
100 }
101
102 auto view = std::dynamic_pointer_cast<watchman::InMemoryView>(root->view());
103 w_assert(view, "we're called from InMemoryView, wat?");
104 view->warmContentCache();
105
106 auto settledPayload = json_object({{"settled", json_true()}});
107 root->unilateralResponses->enqueue(std::move(settledPayload));
108
109 if (root->considerReap()) {
110 root->stopWatch();
111 return true;
112 }
113
114 root->considerAgeOut();
115 return false;
116 }
117
clientModeCrawl(const std::shared_ptr<w_root_t> & root)118 void InMemoryView::clientModeCrawl(const std::shared_ptr<w_root_t>& root) {
119 PendingCollection pending;
120
121 auto lock = pending.wlock();
122 fullCrawl(root, lock);
123 }
124
handleShouldRecrawl(const std::shared_ptr<w_root_t> & root)125 bool InMemoryView::handleShouldRecrawl(const std::shared_ptr<w_root_t>& root) {
126 {
127 auto info = root->recrawlInfo.rlock();
128 if (!info->shouldRecrawl) {
129 return false;
130 }
131 }
132
133 if (!root->inner.cancelled) {
134 auto info = root->recrawlInfo.wlock();
135 info->recrawlCount++;
136 root->inner.done_initial = false;
137 }
138
139 return true;
140 }
141
ioThread(const std::shared_ptr<w_root_t> & root)142 void InMemoryView::ioThread(const std::shared_ptr<w_root_t>& root) {
143 int timeoutms, biggest_timeout;
144 PendingCollection pending;
145 auto localPendingLock = pending.wlock();
146
147 timeoutms = root->trigger_settle;
148
149 // Upper bound on sleep delay. These options are measured in seconds.
150 biggest_timeout = root->gc_interval;
151 if (biggest_timeout == 0 ||
152 (root->idle_reap_age != 0 && root->idle_reap_age < biggest_timeout)) {
153 biggest_timeout = root->idle_reap_age;
154 }
155 if (biggest_timeout == 0) {
156 biggest_timeout = 86400;
157 }
158 // And convert to milliseconds
159 biggest_timeout *= 1000;
160
161 while (!stopThreads_) {
162 bool pinged;
163
164 if (!root->inner.done_initial) {
165 /* first order of business is to find all the files under our root */
166 fullCrawl(root, localPendingLock);
167
168 timeoutms = root->trigger_settle;
169 }
170
171 // Wait for the notify thread to give us pending items, or for
172 // the settle period to expire
173 {
174 w_log(W_LOG_DBG, "poll_events timeout=%dms\n", timeoutms);
175 auto targetPendingLock =
176 pending_.lockAndWait(std::chrono::milliseconds(timeoutms), pinged);
177 w_log(W_LOG_DBG, " ... wake up (pinged=%s)\n", pinged ? "true" : "false");
178 localPendingLock->append(&*targetPendingLock);
179 }
180
181 if (handleShouldRecrawl(root)) {
182 fullCrawl(root, localPendingLock);
183 timeoutms = root->trigger_settle;
184 continue;
185 }
186
187 if (!pinged && localPendingLock->size() == 0) {
188 if (do_settle_things(root)) {
189 break;
190 }
191 timeoutms = std::min(biggest_timeout, timeoutms * 2);
192 continue;
193 }
194
195 // Otherwise we have pending items to stat and crawl
196
197 // We are now, by definition, unsettled, so reduce sleep timeout
198 // to the settle duration ready for the next loop through
199 timeoutms = root->trigger_settle;
200
201 {
202 auto view = view_.wlock();
203 if (!root->inner.done_initial) {
204 // we need to recrawl. Discard these notifications
205 localPendingLock->drain();
206 continue;
207 }
208
209 view->mostRecentTick++;
210
211 while (processPending(root, view, localPendingLock, false)) {
212 ;
213 }
214 }
215 }
216 }
217
processPath(const std::shared_ptr<w_root_t> & root,SyncView::LockedPtr & view,PendingCollection::LockedPtr & coll,const w_string & full_path,struct timeval now,int flags,const watchman_dir_ent * pre_stat)218 void InMemoryView::processPath(
219 const std::shared_ptr<w_root_t>& root,
220 SyncView::LockedPtr& view,
221 PendingCollection::LockedPtr& coll,
222 const w_string& full_path,
223 struct timeval now,
224 int flags,
225 const watchman_dir_ent* pre_stat) {
226 /* From a particular query's point of view, there are four sorts of cookies we
227 * can observe:
228 * 1. Cookies that this query has created. This marks the end of this query's
229 * sync_to_now, so we hide it from the results.
230 * 2. Cookies that another query on the same watch by the same process has
231 * created. This marks the end of that other query's sync_to_now, so from
232 * the point of view of this query we turn a blind eye to it.
233 * 3. Cookies created by another process on the same watch. We're independent
234 * of other processes, so we report these.
235 * 4. Cookies created by a nested watch by the same or a different process.
236 * We're independent of other watches, so we report these.
237 *
238 * The below condition is true for cases 1 and 2 and false for 3 and 4.
239 */
240 if (w_string_startswith(full_path, cookies_.cookiePrefix())) {
241 bool consider_cookie =
242 (watcher_->flags & WATCHER_HAS_PER_FILE_NOTIFICATIONS)
243 ? ((flags & W_PENDING_VIA_NOTIFY) || !root->inner.done_initial)
244 : true;
245
246 if (consider_cookie) {
247 cookies_.notifyCookie(full_path);
248 }
249
250 // Never allow cookie files to show up in the tree
251 return;
252 }
253
254 if (w_string_equal(full_path, root_path) ||
255 (flags & W_PENDING_CRAWL_ONLY) == W_PENDING_CRAWL_ONLY) {
256 crawler(
257 root,
258 view,
259 coll,
260 full_path,
261 now,
262 (flags & W_PENDING_RECURSIVE) == W_PENDING_RECURSIVE);
263 } else {
264 statPath(root, view, coll, full_path, now, flags, pre_stat);
265 }
266 }
267
processPending(const std::shared_ptr<w_root_t> & root,SyncView::LockedPtr & view,PendingCollection::LockedPtr & coll,bool pullFromRoot)268 bool InMemoryView::processPending(
269 const std::shared_ptr<w_root_t>& root,
270 SyncView::LockedPtr& view,
271 PendingCollection::LockedPtr& coll,
272 bool pullFromRoot) {
273 if (pullFromRoot) {
274 auto srcLock = pending_.wlock();
275 coll->append(&*srcLock);
276 }
277
278 if (!coll->size()) {
279 return false;
280 }
281
282 w_log(
283 W_LOG_DBG,
284 "processing %d events in %s\n",
285 coll->size(),
286 root_path.c_str());
287
288 auto pending = coll->stealItems();
289
290 while (pending) {
291 if (!stopThreads_) {
292 processPath(
293 root,
294 view,
295 coll,
296 pending->path,
297 pending->now,
298 pending->flags,
299 nullptr);
300 }
301
302 pending = std::move(pending->next);
303 }
304
305 return true;
306 }
307 }
308
309 /* vim:ts=2:sw=2:et:
310 */
311