1 /* Copyright 2012-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 
4 #include "watchman.h"
5 #include "InMemoryView.h"
6 
7 namespace watchman {
8 
waitUntilReadyToQuery(const std::shared_ptr<w_root_t> & root)9 std::shared_future<void> InMemoryView::waitUntilReadyToQuery(
10     const std::shared_ptr<w_root_t>& root) {
11   auto lockPair = acquireLockedPair(root->recrawlInfo, crawlState_);
12 
13   if (lockPair.second->promise && lockPair.second->future.valid()) {
14     return lockPair.second->future;
15   }
16 
17   if (root->inner.done_initial && !lockPair.first->shouldRecrawl) {
18     // Return an already satisfied future
19     std::promise<void> p;
20     p.set_value();
21     return p.get_future();
22   }
23 
24   // Not yet done, so queue up the promise
25   lockPair.second->promise = watchman::make_unique<std::promise<void>>();
26   lockPair.second->future =
27       std::shared_future<void>(lockPair.second->promise->get_future());
28   return lockPair.second->future;
29 }
30 
fullCrawl(const std::shared_ptr<w_root_t> & root,PendingCollection::LockedPtr & pending)31 void InMemoryView::fullCrawl(
32     const std::shared_ptr<w_root_t>& root,
33     PendingCollection::LockedPtr& pending) {
34   struct timeval start;
35 
36   w_perf_t sample("full-crawl");
37   if (config_.getBool("iothrottle", false)) {
38     w_ioprio_set_low();
39   }
40   {
41     auto view = view_.wlock();
42     // Ensure that we observe these files with a new, distinct clock,
43     // otherwise a fresh subscription established immediately after a watch
44     // can get stuck with an empty view until another change is observed
45     view->mostRecentTick++;
46     gettimeofday(&start, NULL);
47     pending_.wlock()->add(root->root_path, start, W_PENDING_RECURSIVE);
48     // There is the potential for a subtle race condition here.  The boolean
49     // parameter indicates whether we want to merge in the set of
50     // notifications pending from the watcher or not.  Since we now coalesce
51     // overlaps we must consume our outstanding set before we merge in any
52     // new kernel notification information or we risk missing out on
53     // observing changes that happen during the initial crawl.  This
54     // translates to a two level loop; the outer loop sweeps in data from
55     // inotify, then the inner loop processes it and any dirs that we pick up
56     // from recursive processing.
57     while (processPending(root, view, pending, true)) {
58       while (processPending(root, view, pending, false)) {
59         ;
60       }
61     }
62     {
63       auto lockPair = acquireLockedPair(root->recrawlInfo, crawlState_);
64       lockPair.first->shouldRecrawl = false;
65       if (lockPair.second->promise) {
66         lockPair.second->promise->set_value();
67         lockPair.second->promise.reset();
68       }
69       root->inner.done_initial = true;
70     }
71     root->cookies.abortAllCookies();
72   }
73   sample.add_root_meta(root);
74 
75   if (config_.getBool("iothrottle", false)) {
76     w_ioprio_set_normal();
77   }
78 
79   sample.finish();
80   sample.force_log();
81   sample.log();
82 
83   w_log(
84       W_LOG_ERR,
85       "%scrawl complete\n",
86       root->recrawlInfo.rlock()->recrawlCount ? "re" : "");
87 }
88 
89 // Performs settle-time actions.
90 // Returns true if the root was reaped and the io thread should terminate.
do_settle_things(const std::shared_ptr<w_root_t> & root)91 static bool do_settle_things(const std::shared_ptr<w_root_t>& root) {
92   // No new pending items were given to us, so consider that
93   // we may now be settled.
94 
95   root->processPendingSymlinkTargets();
96 
97   if (!root->inner.done_initial) {
98     // we need to recrawl, stop what we're doing here
99     return false;
100   }
101 
102   auto view = std::dynamic_pointer_cast<watchman::InMemoryView>(root->view());
103   w_assert(view, "we're called from InMemoryView, wat?");
104   view->warmContentCache();
105 
106   auto settledPayload = json_object({{"settled", json_true()}});
107   root->unilateralResponses->enqueue(std::move(settledPayload));
108 
109   if (root->considerReap()) {
110     root->stopWatch();
111     return true;
112   }
113 
114   root->considerAgeOut();
115   return false;
116 }
117 
clientModeCrawl(const std::shared_ptr<w_root_t> & root)118 void InMemoryView::clientModeCrawl(const std::shared_ptr<w_root_t>& root) {
119   PendingCollection pending;
120 
121   auto lock = pending.wlock();
122   fullCrawl(root, lock);
123 }
124 
handleShouldRecrawl(const std::shared_ptr<w_root_t> & root)125 bool InMemoryView::handleShouldRecrawl(const std::shared_ptr<w_root_t>& root) {
126   {
127     auto info = root->recrawlInfo.rlock();
128     if (!info->shouldRecrawl) {
129       return false;
130     }
131   }
132 
133   if (!root->inner.cancelled) {
134     auto info = root->recrawlInfo.wlock();
135     info->recrawlCount++;
136     root->inner.done_initial = false;
137   }
138 
139   return true;
140 }
141 
ioThread(const std::shared_ptr<w_root_t> & root)142 void InMemoryView::ioThread(const std::shared_ptr<w_root_t>& root) {
143   int timeoutms, biggest_timeout;
144   PendingCollection pending;
145   auto localPendingLock = pending.wlock();
146 
147   timeoutms = root->trigger_settle;
148 
149   // Upper bound on sleep delay.  These options are measured in seconds.
150   biggest_timeout = root->gc_interval;
151   if (biggest_timeout == 0 ||
152       (root->idle_reap_age != 0 && root->idle_reap_age < biggest_timeout)) {
153     biggest_timeout = root->idle_reap_age;
154   }
155   if (biggest_timeout == 0) {
156     biggest_timeout = 86400;
157   }
158   // And convert to milliseconds
159   biggest_timeout *= 1000;
160 
161   while (!stopThreads_) {
162     bool pinged;
163 
164     if (!root->inner.done_initial) {
165       /* first order of business is to find all the files under our root */
166       fullCrawl(root, localPendingLock);
167 
168       timeoutms = root->trigger_settle;
169     }
170 
171     // Wait for the notify thread to give us pending items, or for
172     // the settle period to expire
173     {
174       w_log(W_LOG_DBG, "poll_events timeout=%dms\n", timeoutms);
175       auto targetPendingLock =
176           pending_.lockAndWait(std::chrono::milliseconds(timeoutms), pinged);
177       w_log(W_LOG_DBG, " ... wake up (pinged=%s)\n", pinged ? "true" : "false");
178       localPendingLock->append(&*targetPendingLock);
179     }
180 
181     if (handleShouldRecrawl(root)) {
182       fullCrawl(root, localPendingLock);
183       timeoutms = root->trigger_settle;
184       continue;
185     }
186 
187     if (!pinged && localPendingLock->size() == 0) {
188       if (do_settle_things(root)) {
189         break;
190       }
191       timeoutms = std::min(biggest_timeout, timeoutms * 2);
192       continue;
193     }
194 
195     // Otherwise we have pending items to stat and crawl
196 
197     // We are now, by definition, unsettled, so reduce sleep timeout
198     // to the settle duration ready for the next loop through
199     timeoutms = root->trigger_settle;
200 
201     {
202       auto view = view_.wlock();
203       if (!root->inner.done_initial) {
204         // we need to recrawl.  Discard these notifications
205         localPendingLock->drain();
206         continue;
207       }
208 
209       view->mostRecentTick++;
210 
211       while (processPending(root, view, localPendingLock, false)) {
212         ;
213       }
214     }
215   }
216 }
217 
processPath(const std::shared_ptr<w_root_t> & root,SyncView::LockedPtr & view,PendingCollection::LockedPtr & coll,const w_string & full_path,struct timeval now,int flags,const watchman_dir_ent * pre_stat)218 void InMemoryView::processPath(
219     const std::shared_ptr<w_root_t>& root,
220     SyncView::LockedPtr& view,
221     PendingCollection::LockedPtr& coll,
222     const w_string& full_path,
223     struct timeval now,
224     int flags,
225     const watchman_dir_ent* pre_stat) {
226   /* From a particular query's point of view, there are four sorts of cookies we
227    * can observe:
228    * 1. Cookies that this query has created. This marks the end of this query's
229    *    sync_to_now, so we hide it from the results.
230    * 2. Cookies that another query on the same watch by the same process has
231    *    created. This marks the end of that other query's sync_to_now, so from
232    *    the point of view of this query we turn a blind eye to it.
233    * 3. Cookies created by another process on the same watch. We're independent
234    *    of other processes, so we report these.
235    * 4. Cookies created by a nested watch by the same or a different process.
236    *    We're independent of other watches, so we report these.
237    *
238    * The below condition is true for cases 1 and 2 and false for 3 and 4.
239    */
240   if (w_string_startswith(full_path, cookies_.cookiePrefix())) {
241     bool consider_cookie =
242         (watcher_->flags & WATCHER_HAS_PER_FILE_NOTIFICATIONS)
243         ? ((flags & W_PENDING_VIA_NOTIFY) || !root->inner.done_initial)
244         : true;
245 
246     if (consider_cookie) {
247       cookies_.notifyCookie(full_path);
248     }
249 
250     // Never allow cookie files to show up in the tree
251     return;
252   }
253 
254   if (w_string_equal(full_path, root_path) ||
255       (flags & W_PENDING_CRAWL_ONLY) == W_PENDING_CRAWL_ONLY) {
256     crawler(
257         root,
258         view,
259         coll,
260         full_path,
261         now,
262         (flags & W_PENDING_RECURSIVE) == W_PENDING_RECURSIVE);
263   } else {
264     statPath(root, view, coll, full_path, now, flags, pre_stat);
265   }
266 }
267 
processPending(const std::shared_ptr<w_root_t> & root,SyncView::LockedPtr & view,PendingCollection::LockedPtr & coll,bool pullFromRoot)268 bool InMemoryView::processPending(
269     const std::shared_ptr<w_root_t>& root,
270     SyncView::LockedPtr& view,
271     PendingCollection::LockedPtr& coll,
272     bool pullFromRoot) {
273   if (pullFromRoot) {
274     auto srcLock = pending_.wlock();
275     coll->append(&*srcLock);
276   }
277 
278   if (!coll->size()) {
279     return false;
280   }
281 
282   w_log(
283       W_LOG_DBG,
284       "processing %d events in %s\n",
285       coll->size(),
286       root_path.c_str());
287 
288   auto pending = coll->stealItems();
289 
290   while (pending) {
291     if (!stopThreads_) {
292       processPath(
293           root,
294           view,
295           coll,
296           pending->path,
297           pending->now,
298           pending->flags,
299           nullptr);
300     }
301 
302     pending = std::move(pending->next);
303   }
304 
305   return true;
306 }
307 }
308 
309 /* vim:ts=2:sw=2:et:
310  */
311