1 #include "import_pipeline.h"
2 
3 #include "cache_manager.h"
4 #include "config.h"
5 #include "diagnostics_engine.h"
6 #include "iindexer.h"
7 #include "import_manager.h"
8 #include "lsp.h"
9 #include "message_handler.h"
10 #include "platform.h"
11 #include "project.h"
12 #include "query_utils.h"
13 #include "queue_manager.h"
14 #include "timer.h"
15 #include "timestamp_manager.h"
16 
17 #include <doctest/doctest.h>
18 #include <loguru.hpp>
19 
20 #include <atomic>
21 #include <chrono>
22 #include <string>
23 #include <vector>
24 
25 namespace {
26 
27 struct Out_Progress : public lsOutMessage<Out_Progress> {
28   struct Params {
29     int indexRequestCount = 0;
30     int doIdMapCount = 0;
31     int onIdMappedCount = 0;
32     int onIndexedCount = 0;
33     int activeThreads = 0;
34   };
35   std::string method = "$cquery/progress";
36   Params params;
37 };
38 MAKE_REFLECT_STRUCT(Out_Progress::Params,
39                     indexRequestCount,
40                     doIdMapCount,
41                     onIdMappedCount,
42                     onIndexedCount,
43                     activeThreads);
44 MAKE_REFLECT_STRUCT(Out_Progress, jsonrpc, method, params);
45 
46 // Instead of processing messages forever, we only process upto
47 // |kIterationSize| messages of a type at one time. While the import time
48 // likely stays the same, this should reduce overall queue lengths which means
49 // the user gets a usable index faster.
50 constexpr int kIterationSize = 200;
51 struct IterationLoop {
52   int remaining = kIterationSize;
IncreaseCount__anonbc37f63c0111::IterationLoop53   void IncreaseCount() { remaining *= 50; }
54 
Next__anonbc37f63c0111::IterationLoop55   bool Next() { return remaining-- > 0; }
Reset__anonbc37f63c0111::IterationLoop56   void Reset() { remaining = kIterationSize; }
57 };
58 
59 struct IModificationTimestampFetcher {
60   virtual ~IModificationTimestampFetcher() = default;
61   virtual optional<int64_t> GetModificationTime(const AbsolutePath& path) = 0;
62 };
63 struct RealModificationTimestampFetcher : IModificationTimestampFetcher {
64   ~RealModificationTimestampFetcher() override = default;
65 
66   // IModificationTimestamp:
GetModificationTime__anonbc37f63c0111::RealModificationTimestampFetcher67   optional<int64_t> GetModificationTime(const AbsolutePath& path) override {
68     return GetLastModificationTime(path);
69   }
70 };
71 struct FakeModificationTimestampFetcher : IModificationTimestampFetcher {
72   std::unordered_map<std::string, optional<int64_t>> entries;
73 
74   ~FakeModificationTimestampFetcher() override = default;
75 
76   // IModificationTimestamp:
GetModificationTime__anonbc37f63c0111::FakeModificationTimestampFetcher77   optional<int64_t> GetModificationTime(const AbsolutePath& path) override {
78     auto it = entries.find(path);
79     assert(it != entries.end());
80     return it->second;
81   }
82 };
83 
GetCurrentTimeInMilliseconds()84 long long GetCurrentTimeInMilliseconds() {
85   auto time_since_epoch = Timer::Clock::now().time_since_epoch();
86   long long elapsed_milliseconds =
87       std::chrono::duration_cast<std::chrono::milliseconds>(time_since_epoch)
88           .count();
89   return elapsed_milliseconds;
90 }
91 
92 struct ActiveThread {
ActiveThread__anonbc37f63c0111::ActiveThread93   ActiveThread(ImportPipelineStatus* status) : status_(status) {
94     if (g_config->progressReportFrequencyMs < 0)
95       return;
96 
97     ++status_->num_active_threads;
98   }
~ActiveThread__anonbc37f63c0111::ActiveThread99   ~ActiveThread() {
100     if (g_config->progressReportFrequencyMs < 0)
101       return;
102 
103     --status_->num_active_threads;
104     EmitProgress();
105   }
106 
107   // Send indexing progress to client if reporting is enabled.
EmitProgress__anonbc37f63c0111::ActiveThread108   void EmitProgress() {
109     auto* queue = QueueManager::instance();
110     Out_Progress out;
111     out.params.indexRequestCount = queue->index_request.Size();
112     out.params.doIdMapCount = queue->do_id_map.Size();
113     out.params.onIdMappedCount = queue->on_id_mapped.Size();
114     out.params.onIndexedCount = queue->on_indexed_for_merge.Size() +
115                                 queue->on_indexed_for_querydb.Size();
116     out.params.activeThreads = status_->num_active_threads;
117 
118     // Ignore this progress update if the last update was too recent.
119     if (g_config->progressReportFrequencyMs != 0) {
120       // Make sure we output a status update if queue lengths are zero.
121       bool all_zero =
122           out.params.indexRequestCount == 0 && out.params.doIdMapCount == 0 &&
123           out.params.onIdMappedCount == 0 && out.params.onIndexedCount == 0 &&
124           out.params.activeThreads == 0;
125       if (!all_zero &&
126           GetCurrentTimeInMilliseconds() < status_->next_progress_output)
127         return;
128       status_->next_progress_output =
129           GetCurrentTimeInMilliseconds() + g_config->progressReportFrequencyMs;
130     }
131 
132     QueueManager::WriteStdout(kMethodType_Unknown, out);
133   }
134 
135   ImportPipelineStatus* status_;
136 };
137 
138 enum class ShouldParse { Yes, No, NoSuchFile };
139 
140 // Checks if |path| needs to be reparsed. This will modify cached state
141 // such that calling this function twice with the same path may return true
142 // the first time but will return false the second.
143 //
144 // |from|: The file which generated the parse request for this file.
FileNeedsParse(bool is_interactive,TimestampManager * timestamp_manager,IModificationTimestampFetcher * modification_timestamp_fetcher,ImportManager * import_manager,const std::shared_ptr<ICacheManager> & cache_manager,IndexFile * opt_previous_index,const AbsolutePath & path,const std::vector<std::string> & args,const optional<AbsolutePath> & from)145 ShouldParse FileNeedsParse(
146     bool is_interactive,
147     TimestampManager* timestamp_manager,
148     IModificationTimestampFetcher* modification_timestamp_fetcher,
149     ImportManager* import_manager,
150     const std::shared_ptr<ICacheManager>& cache_manager,
151     IndexFile* opt_previous_index,
152     const AbsolutePath& path,
153     const std::vector<std::string>& args,
154     const optional<AbsolutePath>& from) {
155   auto unwrap_opt = [](const optional<AbsolutePath>& opt) -> std::string {
156     if (opt)
157       return " (via " + opt->path + ")";
158     return "";
159   };
160 
161   // If the file is a dependency but another file as already imported it,
162   // don't bother.
163   if (!is_interactive && from &&
164       !import_manager->TryMarkDependencyImported(path)) {
165     return ShouldParse::No;
166   }
167 
168   optional<int64_t> modification_timestamp =
169       modification_timestamp_fetcher->GetModificationTime(path);
170 
171   // Cannot find file.
172   if (!modification_timestamp)
173     return ShouldParse::NoSuchFile;
174 
175   optional<int64_t> last_cached_modification =
176       timestamp_manager->GetLastCachedModificationTime(cache_manager.get(),
177                                                        path);
178 
179   // File has been changed.
180   if (!last_cached_modification ||
181       modification_timestamp != *last_cached_modification) {
182     LOG_S(INFO) << "Timestamp has changed for " << path << unwrap_opt(from);
183     return ShouldParse::Yes;
184   }
185 
186   // Command-line arguments changed.
187   auto is_file = [](const std::string& arg) {
188     return EndsWithAny(arg, {".h", ".c", ".cc", ".cpp", ".hpp", ".m", ".mm"});
189   };
190   if (opt_previous_index) {
191     auto& prev_args = opt_previous_index->args;
192     bool same = prev_args.size() == args.size();
193     for (size_t i = 0; i < args.size() && same; ++i) {
194       same = prev_args[i] == args[i] ||
195              (is_file(prev_args[i]) && is_file(args[i]));
196     }
197     if (!same) {
198       LOG_S(INFO) << "Arguments have changed for " << path << unwrap_opt(from);
199       return ShouldParse::Yes;
200     }
201   }
202 
203   // File has not changed, do not parse it.
204   return ShouldParse::No;
205 };
206 
207 enum CacheLoadResult { Parse, DoNotParse };
TryLoadFromCache(FileConsumerSharedState * file_consumer_shared,TimestampManager * timestamp_manager,IModificationTimestampFetcher * modification_timestamp_fetcher,ImportManager * import_manager,const std::shared_ptr<ICacheManager> & cache_manager,bool is_interactive,const Project::Entry & entry,const AbsolutePath & path_to_index)208 CacheLoadResult TryLoadFromCache(
209     FileConsumerSharedState* file_consumer_shared,
210     TimestampManager* timestamp_manager,
211     IModificationTimestampFetcher* modification_timestamp_fetcher,
212     ImportManager* import_manager,
213     const std::shared_ptr<ICacheManager>& cache_manager,
214     bool is_interactive,
215     const Project::Entry& entry,
216     const AbsolutePath& path_to_index) {
217   // Always run this block, even if we are interactive, so we can check
218   // dependencies and reset files in |file_consumer_shared|.
219   IndexFile* previous_index = cache_manager->TryLoad(path_to_index);
220   if (!previous_index)
221     return CacheLoadResult::Parse;
222 
223   // If none of the dependencies have changed and the index is not
224   // interactive (ie, requested by a file save), skip parsing and just load
225   // from cache.
226 
227   // Check timestamps and update |file_consumer_shared|.
228   ShouldParse path_state = FileNeedsParse(
229       is_interactive, timestamp_manager, modification_timestamp_fetcher,
230       import_manager, cache_manager, previous_index, path_to_index, entry.args,
231       nullopt);
232   if (path_state == ShouldParse::Yes)
233     file_consumer_shared->Reset(path_to_index);
234 
235   // Target file does not exist on disk, do not emit any indexes.
236   // TODO: Dependencies should be reassigned to other files. We can do this by
237   // updating the "primary_file" if it doesn't exist. Might not actually be a
238   // problem in practice.
239   if (path_state == ShouldParse::NoSuchFile)
240     return CacheLoadResult::DoNotParse;
241 
242   bool needs_reparse = is_interactive || path_state == ShouldParse::Yes;
243 
244   for (const AbsolutePath& dependency : previous_index->dependencies) {
245     assert(!dependency.path.empty());
246 
247     if (FileNeedsParse(is_interactive, timestamp_manager,
248                        modification_timestamp_fetcher, import_manager,
249                        cache_manager, previous_index, dependency, entry.args,
250                        previous_index->path) == ShouldParse::Yes) {
251       needs_reparse = true;
252 
253       // Do not break here, as we need to update |file_consumer_shared| for
254       // every dependency that needs to be reparsed.
255       file_consumer_shared->Reset(dependency);
256     }
257   }
258 
259   // FIXME: should we still load from cache?
260   if (needs_reparse)
261     return CacheLoadResult::Parse;
262 
263   // No timestamps changed - load directly from cache.
264   LOG_S(INFO) << "Skipping parse; no timestamp change for " << path_to_index;
265 
266   std::vector<Index_DoIdMap> result;
267   result.push_back(Index_DoIdMap(cache_manager->TakeOrLoad(path_to_index),
268                                  cache_manager, is_interactive,
269                                  false /*write_to_disk*/));
270   for (const AbsolutePath& dependency : previous_index->dependencies) {
271     // Only load a dependency if it is not already loaded.
272     //
273     // This is important for perf in large projects where there are lots of
274     // dependencies shared between many files.
275     if (!file_consumer_shared->Mark(dependency))
276       continue;
277 
278     LOG_S(INFO) << "Emitting index result for " << dependency << " (via "
279                 << previous_index->path << ")";
280 
281     std::unique_ptr<IndexFile> dependency_index =
282         cache_manager->TryTakeOrLoad(dependency);
283 
284     // |dependency_index| may be null if there is no cache for it but
285     // another file has already started importing it.
286     if (!dependency_index)
287       continue;
288 
289     result.push_back(Index_DoIdMap(std::move(dependency_index), cache_manager,
290                                    is_interactive, false /*write_to_disk*/));
291   }
292 
293   // Mark all of the entries as imported so we will load do delta loads next
294   // time.
295   for (Index_DoIdMap& entry : result)
296     import_manager->IsInitialImport(entry.current->path);
297 
298   QueueManager::instance()->do_id_map.EnqueueAll(std::move(result),
299                                                  false /*priority*/);
300   return CacheLoadResult::DoNotParse;
301 }
302 
PreloadFileContents(const std::shared_ptr<ICacheManager> & cache_manager,const Project::Entry & entry,const optional<std::string> & entry_contents,const AbsolutePath & path_to_index)303 std::vector<FileContents> PreloadFileContents(
304     const std::shared_ptr<ICacheManager>& cache_manager,
305     const Project::Entry& entry,
306     const optional<std::string>& entry_contents,
307     const AbsolutePath& path_to_index) {
308   // Load file contents for all dependencies into memory. If the dependencies
309   // for the file changed we may not end up using all of the files we
310   // preloaded. If a new dependency was added the indexer will grab the file
311   // contents as soon as possible.
312   //
313   // We do this to minimize the race between indexing a file and capturing the
314   // file contents.
315   //
316   // TODO: We might be able to optimize perf by only copying for files in
317   //       working_files. We can pass that same set of files to the indexer as
318   //       well. We then default to a fast file-copy if not in working set.
319 
320   // index->file_contents comes from cache, so we need to check if that cache is
321   // still valid. if so, we can use it, otherwise we need to load from disk.
322   auto get_latest_content = [](const AbsolutePath& path, int64_t cached_time,
323                                const std::string& cached) -> std::string {
324     optional<int64_t> mod_time = GetLastModificationTime(path);
325     if (!mod_time)
326       return "";
327 
328     if (*mod_time == cached_time)
329       return cached;
330 
331     optional<std::string> fresh_content = ReadContent(path);
332     if (!fresh_content) {
333       LOG_S(ERROR) << "Failed to load content for " << path;
334       return "";
335     }
336     return *fresh_content;
337   };
338 
339   std::vector<FileContents> file_contents;
340   if (entry_contents)
341     file_contents.push_back(FileContents(entry.filename, *entry_contents));
342   cache_manager->IterateLoadedCaches([&](IndexFile* index) {
343     if (entry_contents && index->path == entry.filename)
344       return;
345     file_contents.push_back(FileContents(
346         index->path,
347         get_latest_content(index->path, index->last_modification_time,
348                            index->file_contents)));
349   });
350 
351   return file_contents;
352 }
353 
ParseFile(DiagnosticsEngine * diag_engine,WorkingFiles * working_files,FileConsumerSharedState * file_consumer_shared,TimestampManager * timestamp_manager,IModificationTimestampFetcher * modification_timestamp_fetcher,ImportManager * import_manager,IIndexer * indexer,const Index_Request & request,const Project::Entry & entry)354 void ParseFile(DiagnosticsEngine* diag_engine,
355                WorkingFiles* working_files,
356                FileConsumerSharedState* file_consumer_shared,
357                TimestampManager* timestamp_manager,
358                IModificationTimestampFetcher* modification_timestamp_fetcher,
359                ImportManager* import_manager,
360                IIndexer* indexer,
361                const Index_Request& request,
362                const Project::Entry& entry) {
363   // If the file is inferred, we may not actually be able to parse that file
364   // directly (ie, a header file, which are not listed in the project). If this
365   // file is inferred, then try to use the file which originally imported it.
366   // FIXME: don't use absolute path
367   AbsolutePath path_to_index = entry.filename;
368   if (entry.is_inferred) {
369     IndexFile* entry_cache = request.cache_manager->TryLoad(entry.filename);
370     if (entry_cache)
371       path_to_index = entry_cache->import_file;
372   }
373 
374   // Try to load the file from cache.
375   if (TryLoadFromCache(file_consumer_shared, timestamp_manager,
376                        modification_timestamp_fetcher, import_manager,
377                        request.cache_manager, request.is_interactive, entry,
378                        path_to_index) == CacheLoadResult::DoNotParse) {
379     return;
380   }
381 
382   LOG_S(INFO) << "Parsing " << path_to_index;
383   std::vector<FileContents> file_contents = PreloadFileContents(
384       request.cache_manager, entry, request.contents, path_to_index);
385 
386   std::vector<Index_DoIdMap> result;
387   auto indexes = indexer->Index(file_consumer_shared, path_to_index, entry.args,
388                                 file_contents);
389 
390   if (!indexes) {
391     if (g_config->index.enabled && request.id.has_value()) {
392       Out_Error out;
393       out.id = request.id;
394       out.error.code = lsErrorCodes::InternalError;
395       out.error.message = "Failed to index " + path_to_index.path;
396       QueueManager::WriteStdout(kMethodType_Unknown, out);
397     }
398     return;
399   }
400 
401   for (std::unique_ptr<IndexFile>& new_index : *indexes) {
402     // Only emit diagnostics for non-interactive sessions, which makes it easier
403     // to identify indexing problems. For interactive sessions, diagnostics are
404     // handled by code completion.
405     if (!request.is_interactive) {
406       diag_engine->Publish(working_files, new_index->path,
407                            new_index->diagnostics_);
408     }
409 
410     // When main thread does IdMap request it will request the previous index if
411     // needed.
412     LOG_S(INFO) << "Emitting index result for " << new_index->path;
413     result.push_back(Index_DoIdMap(std::move(new_index), request.cache_manager,
414                                    request.is_interactive,
415                                    true /*write_to_disk*/));
416   }
417 
418   // Load previous index if the file has already been imported so we can do a
419   // delta update.
420   for (Index_DoIdMap& request : result) {
421     if (!import_manager->IsInitialImport(request.current->path)) {
422       request.previous =
423           request.cache_manager->TryTakeOrLoad(request.current->path);
424       LOG_IF_S(ERROR, !request.previous)
425           << "Unable to load previous index for already imported index "
426           << request.current->path;
427     }
428   }
429 
430   // Write index to disk if requested.
431   for (Index_DoIdMap& request : result) {
432     if (request.write_to_disk) {
433       LOG_S(INFO) << "Writing cached index to disk for "
434                   << request.current->path;
435       request.cache_manager->WriteToCache(*request.current);
436       timestamp_manager->UpdateCachedModificationTime(
437           request.current->path, request.current->last_modification_time);
438     }
439   }
440 
441   QueueManager::instance()->do_id_map.EnqueueAll(std::move(result),
442                                                  request.is_interactive);
443 }
444 
IndexMain_DoParse(DiagnosticsEngine * diag_engine,WorkingFiles * working_files,FileConsumerSharedState * file_consumer_shared,TimestampManager * timestamp_manager,IModificationTimestampFetcher * modification_timestamp_fetcher,ImportManager * import_manager,IIndexer * indexer)445 bool IndexMain_DoParse(
446     DiagnosticsEngine* diag_engine,
447     WorkingFiles* working_files,
448     FileConsumerSharedState* file_consumer_shared,
449     TimestampManager* timestamp_manager,
450     IModificationTimestampFetcher* modification_timestamp_fetcher,
451     ImportManager* import_manager,
452     IIndexer* indexer) {
453   auto* queue = QueueManager::instance();
454   optional<Index_Request> request =
455       queue->index_request.TryDequeue(true /*priority*/);
456   if (!request)
457     return false;
458 
459   Project::Entry entry;
460   entry.filename = request->path;
461   entry.args = request->args;
462   ParseFile(diag_engine, working_files, file_consumer_shared, timestamp_manager,
463             modification_timestamp_fetcher, import_manager, indexer,
464             request.value(), entry);
465   return true;
466 }
467 
IndexMain_DoCreateIndexUpdate(TimestampManager * timestamp_manager)468 bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
469   auto* queue = QueueManager::instance();
470 
471   bool did_work = false;
472   IterationLoop loop;
473   while (loop.Next()) {
474     optional<Index_OnIdMapped> response =
475         queue->on_id_mapped.TryDequeue(true /*priority*/);
476     if (!response)
477       return did_work;
478 
479     did_work = true;
480 
481     IdMap* previous_id_map = nullptr;
482     IndexFile* previous_index = nullptr;
483     if (response->previous) {
484       previous_id_map = response->previous->ids.get();
485       previous_index = response->previous->file.get();
486     }
487 
488     // Build delta update.
489     IndexUpdate update =
490         IndexUpdate::CreateDelta(previous_id_map, response->current->ids.get(),
491                                  previous_index, response->current->file.get());
492     LOG_S(INFO) << "Built index update for " << response->current->file->path
493                 << " (is_delta=" << !!response->previous << ")";
494 
495     Index_OnIndexed reply(std::move(update));
496     const int kMaxSizeForQuerydb = 1000;
497     ThreadedQueue<Index_OnIndexed>& q =
498         queue->on_indexed_for_querydb.Size() < kMaxSizeForQuerydb
499             ? queue->on_indexed_for_querydb
500             : queue->on_indexed_for_merge;
501     q.Enqueue(std::move(reply), response->is_interactive /*priority*/);
502   }
503 
504   return did_work;
505 }
506 
IndexMergeIndexUpdates()507 bool IndexMergeIndexUpdates() {
508   // Merge low-priority requests, since priority requests should get serviced
509   // by querydb asap.
510 
511   auto* queue = QueueManager::instance();
512   optional<Index_OnIndexed> root =
513       queue->on_indexed_for_merge.TryDequeue(false /*priority*/);
514   if (!root)
515     return false;
516 
517   bool did_merge = false;
518   IterationLoop loop;
519   while (loop.Next()) {
520     optional<Index_OnIndexed> to_join =
521         queue->on_indexed_for_merge.TryDequeue(false /*priority*/);
522     if (!to_join)
523       break;
524     did_merge = true;
525     root->update.Merge(std::move(to_join->update));
526   }
527 
528   const int kMaxSizeForQuerydb = 1500;
529   ThreadedQueue<Index_OnIndexed>& q =
530       queue->on_indexed_for_querydb.Size() < kMaxSizeForQuerydb
531           ? queue->on_indexed_for_querydb
532           : queue->on_indexed_for_merge;
533   q.Enqueue(std::move(*root), false /*priority*/);
534   return did_merge;
535 }
536 
537 }  // namespace
538 
ImportPipelineStatus()539 ImportPipelineStatus::ImportPipelineStatus()
540     : num_active_threads(0), next_progress_output(0) {}
541 
Indexer_Main(DiagnosticsEngine * diag_engine,FileConsumerSharedState * file_consumer_shared,TimestampManager * timestamp_manager,ImportManager * import_manager,ImportPipelineStatus * status,Project * project,WorkingFiles * working_files)542 void Indexer_Main(DiagnosticsEngine* diag_engine,
543                   FileConsumerSharedState* file_consumer_shared,
544                   TimestampManager* timestamp_manager,
545                   ImportManager* import_manager,
546                   ImportPipelineStatus* status,
547                   Project* project,
548                   WorkingFiles* working_files) {
549   RealModificationTimestampFetcher modification_timestamp_fetcher;
550   auto* queue = QueueManager::instance();
551   // Build one index per-indexer, as building the index acquires a global lock.
552   auto indexer = IIndexer::MakeClangIndexer();
553 
554   while (true) {
555     bool did_work = false;
556 
557     {
558       ActiveThread active_thread(status);
559 
560       // TODO: process all off IndexMain_DoIndex before calling
561       // IndexMain_DoCreateIndexUpdate for better icache behavior. We need to
562       // have some threads spinning on both though otherwise memory usage will
563       // get bad.
564 
565       // We need to make sure to run both IndexMain_DoParse and
566       // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
567       // work. Running both also lets the user query the partially constructed
568       // index.
569       did_work =
570           IndexMain_DoParse(diag_engine, working_files, file_consumer_shared,
571                             timestamp_manager, &modification_timestamp_fetcher,
572                             import_manager, indexer.get()) ||
573           did_work;
574 
575       did_work = IndexMain_DoCreateIndexUpdate(timestamp_manager) || did_work;
576 
577       // Nothing to index and no index updates to create, so join some already
578       // created index updates to reduce work on querydb thread.
579       if (!did_work)
580         did_work = IndexMergeIndexUpdates() || did_work;
581     }
582 
583     // We didn't do any work, so wait for a notification.
584     if (!did_work) {
585       QueueManager::instance()->indexer_waiter->Wait(
586           &queue->index_request, &queue->on_id_mapped,
587           &queue->load_previous_index, &queue->on_indexed_for_merge);
588     }
589   }
590 }
591 
592 namespace {
QueryDb_DoIdMap(QueueManager * queue,QueryDatabase * db,ImportManager * import_manager,Index_DoIdMap * request)593 void QueryDb_DoIdMap(QueueManager* queue,
594                      QueryDatabase* db,
595                      ImportManager* import_manager,
596                      Index_DoIdMap* request) {
597   assert(request->current);
598 
599   // Check if the file is already being imported into querydb. If it is, drop
600   // the request.
601   //
602   // Note, we must do this *after* we have checked for the previous index,
603   // otherwise we will never actually generate the IdMap.
604   if (!import_manager->StartQueryDbImport(request->current->path)) {
605     LOG_S(INFO) << "Dropping index as it is already being imported for "
606                 << request->current->path;
607     return;
608   }
609 
610   Index_OnIdMapped response(request->cache_manager, request->is_interactive,
611                             request->write_to_disk);
612   auto make_map = [db](std::unique_ptr<IndexFile> file)
613       -> std::unique_ptr<Index_OnIdMapped::File> {
614     if (!file)
615       return nullptr;
616 
617     auto id_map = std::make_unique<IdMap>(db, file->id_cache);
618     return std::make_unique<Index_OnIdMapped::File>(std::move(file),
619                                                     std::move(id_map));
620   };
621   response.current = make_map(std::move(request->current));
622   response.previous = make_map(std::move(request->previous));
623 
624   queue->on_id_mapped.Enqueue(std::move(response),
625                               response.is_interactive /*priority*/);
626 }
627 
QueryDb_OnIndexed(QueueManager * queue,QueryDatabase * db,ImportManager * import_manager,ImportPipelineStatus * status,SemanticHighlightSymbolCache * semantic_cache,WorkingFiles * working_files,Index_OnIndexed * response)628 void QueryDb_OnIndexed(QueueManager* queue,
629                        QueryDatabase* db,
630                        ImportManager* import_manager,
631                        ImportPipelineStatus* status,
632                        SemanticHighlightSymbolCache* semantic_cache,
633                        WorkingFiles* working_files,
634                        Index_OnIndexed* response) {
635   Timer time;
636   db->ApplyIndexUpdate(&response->update);
637   time.ResetAndPrint("Applying index update for " +
638                      StringJoinMap(response->update.files_def_update,
639                                    [](const QueryFile::DefUpdate& value) {
640                                      return value.value.path;
641                                    }));
642 
643   // Update indexed content, inactive lines, and semantic highlighting.
644   for (auto& updated_file : response->update.files_def_update) {
645     WorkingFile* working_file =
646         working_files->GetFileByFilename(updated_file.value.path);
647     if (working_file) {
648       // Update indexed content.
649       working_file->SetIndexContent(updated_file.file_content);
650 
651       // Inactive lines.
652       EmitInactiveLines(working_file, updated_file.value.inactive_regions);
653 
654       // Semantic highlighting.
655       QueryId::File file_id = db->usr_to_file[working_file->filename];
656       QueryFile* file = &db->files[file_id.id];
657       EmitSemanticHighlighting(db, semantic_cache, working_file, file);
658     }
659 
660     // Mark the files as being done in querydb stage after we apply the index
661     // update.
662     import_manager->DoneQueryDbImport(updated_file.value.path);
663   }
664 }
665 
666 }  // namespace
667 
QueryDb_ImportMain(QueryDatabase * db,ImportManager * import_manager,ImportPipelineStatus * status,SemanticHighlightSymbolCache * semantic_cache,WorkingFiles * working_files)668 bool QueryDb_ImportMain(QueryDatabase* db,
669                         ImportManager* import_manager,
670                         ImportPipelineStatus* status,
671                         SemanticHighlightSymbolCache* semantic_cache,
672                         WorkingFiles* working_files) {
673   auto* queue = QueueManager::instance();
674 
675   ActiveThread active_thread(status);
676 
677   bool did_work = false;
678 
679   IterationLoop loop;
680   loop.IncreaseCount();
681   while (loop.Next()) {
682     optional<Index_DoIdMap> request =
683         queue->do_id_map.TryDequeue(true /*priority*/);
684     if (!request)
685       break;
686     did_work = true;
687     QueryDb_DoIdMap(queue, db, import_manager, &*request);
688   }
689 
690   loop.Reset();
691   while (loop.Next()) {
692     optional<Index_OnIndexed> response =
693         queue->on_indexed_for_querydb.TryDequeue(true /*priority*/);
694     if (!response)
695       break;
696     did_work = true;
697     QueryDb_OnIndexed(queue, db, import_manager, status, semantic_cache,
698                       working_files, &*response);
699   }
700 
701   return did_work;
702 }
703 
704 TEST_SUITE("ImportPipeline") {
705   struct Fixture {
FixtureFixture706     Fixture() {
707       QueueManager::Init();
708 
709       queue = QueueManager::instance();
710       cache_manager = ICacheManager::MakeFake({});
711       indexer = IIndexer::MakeTestIndexer({});
712       diag_engine.Init();
713     }
714 
PumpOnceFixture715     bool PumpOnce() {
716       return IndexMain_DoParse(&diag_engine, &working_files,
717                                &file_consumer_shared, &timestamp_manager,
718                                &modification_timestamp_fetcher, &import_manager,
719                                indexer.get());
720     }
721 
MakeRequestFixture722     void MakeRequest(const std::string& path,
723                      const std::vector<std::string>& args = {},
724                      bool is_interactive = false,
725                      const std::string& contents = "void foo();") {
726       queue->index_request.Enqueue(
727           Index_Request(path, args, is_interactive, contents, cache_manager),
728           false /*priority*/);
729     }
730 
731     QueueManager* queue = nullptr;
732     DiagnosticsEngine diag_engine;
733     WorkingFiles working_files;
734     FileConsumerSharedState file_consumer_shared;
735     TimestampManager timestamp_manager;
736     FakeModificationTimestampFetcher modification_timestamp_fetcher;
737     ImportManager import_manager;
738     std::shared_ptr<ICacheManager> cache_manager;
739     std::unique_ptr<IIndexer> indexer;
740   };
741 
742   TEST_CASE_FIXTURE(Fixture, "FileNeedsParse") {
743     auto check = [&](const std::string& file, bool is_dependency = false,
744                      bool is_interactive = false,
745                      const std::vector<std::string>& old_args = {},
__anonbc37f63c0902(const std::string& file, bool is_dependency = false, bool is_interactive = false, const std::vector<std::string>& old_args = {}, const std::vector<std::string>& new_args = {}) 746                      const std::vector<std::string>& new_args = {}) {
747       std::unique_ptr<IndexFile> opt_previous_index;
748       if (!old_args.empty()) {
749         opt_previous_index = std::make_unique<IndexFile>(
750             AbsolutePath("---.cc", false /*validate*/), "<empty>");
751         opt_previous_index->args = old_args;
752       }
753       optional<AbsolutePath> from;
754       if (is_dependency)
755         from = AbsolutePath("---.cc", false /*validate*/);
756       return FileNeedsParse(
757           is_interactive /*is_interactive*/, &timestamp_manager,
758           &modification_timestamp_fetcher, &import_manager, cache_manager,
759           opt_previous_index.get(), AbsolutePath(file, false /*validate*/),
760           new_args, from);
761     };
762 
763     // A file with no timestamp is not imported, since this implies the file no
764     // longer exists on disk.
765     modification_timestamp_fetcher.entries["bar.h"] = nullopt;
766     REQUIRE(check("bar.h", false /*is_dependency*/) == ShouldParse::NoSuchFile);
767 
768     // A dependency is only imported once.
769     modification_timestamp_fetcher.entries["foo.h"] = 5;
770     REQUIRE(check("foo.h", true /*is_dependency*/) == ShouldParse::Yes);
771     REQUIRE(check("foo.h", true /*is_dependency*/) == ShouldParse::No);
772 
773     // An interactive dependency is imported.
774     REQUIRE(check("foo.h", true /*is_dependency*/) == ShouldParse::No);
775     REQUIRE(check("foo.h", true /*is_dependency*/, true /*is_interactive*/) ==
776             ShouldParse::Yes);
777 
778     // A file whose timestamp has not changed is not imported. When the
779     // timestamp changes (either forward or backward) it is reimported.
__anonbc37f63c0a02(int64_t timestamp) 780     auto check_timestamp_change = [&](int64_t timestamp) {
781       modification_timestamp_fetcher.entries["aa.cc"] = timestamp;
782       REQUIRE(check("aa.cc") == ShouldParse::Yes);
783       REQUIRE(check("aa.cc") == ShouldParse::Yes);
784       REQUIRE(check("aa.cc") == ShouldParse::Yes);
785       timestamp_manager.UpdateCachedModificationTime("aa.cc", timestamp);
786       REQUIRE(check("aa.cc") == ShouldParse::No);
787     };
788     check_timestamp_change(5);
789     check_timestamp_change(6);
790     check_timestamp_change(5);
791     check_timestamp_change(4);
792 
793     // Argument change implies reimport, even if timestamp has not changed.
794     timestamp_manager.UpdateCachedModificationTime("aa.cc", 5);
795     modification_timestamp_fetcher.entries["aa.cc"] = 5;
796     REQUIRE(check("aa.cc", false /*is_dependency*/, false /*is_interactive*/,
797                   {"b"} /*old_args*/,
798                   {"b", "a"} /*new_args*/) == ShouldParse::Yes);
799   }
800 
801   // FIXME: validate other state like timestamp_manager, etc.
802   // FIXME: add more interesting tests that are not the happy path
803   // FIXME: test
804   //   - IndexMain_DoCreateIndexUpdate
805   //   - QueryDb_ImportMain
806 
807   TEST_CASE_FIXTURE(Fixture, "index request with zero results") {
808     indexer = IIndexer::MakeTestIndexer({IIndexer::TestEntry{"foo.cc", 0}});
809 
810     MakeRequest("foo.cc");
811 
812     REQUIRE(queue->index_request.Size() == 1);
813     REQUIRE(queue->do_id_map.Size() == 0);
814     PumpOnce();
815     REQUIRE(queue->index_request.Size() == 0);
816     REQUIRE(queue->do_id_map.Size() == 0);
817 
818     REQUIRE(file_consumer_shared.used_files.empty());
819   }
820 
821   TEST_CASE_FIXTURE(Fixture, "one index request") {
822     indexer = IIndexer::MakeTestIndexer({IIndexer::TestEntry{"foo.cc", 100}});
823 
824     MakeRequest("foo.cc");
825 
826     REQUIRE(queue->index_request.Size() == 1);
827     REQUIRE(queue->do_id_map.Size() == 0);
828     PumpOnce();
829     REQUIRE(queue->index_request.Size() == 0);
830     REQUIRE(queue->do_id_map.Size() == 100);
831 
832     REQUIRE(file_consumer_shared.used_files.empty());
833   }
834 
835   TEST_CASE_FIXTURE(Fixture, "multiple index requests") {
836     indexer = IIndexer::MakeTestIndexer(
837         {IIndexer::TestEntry{"foo.cc", 100}, IIndexer::TestEntry{"bar.cc", 5}});
838 
839     MakeRequest("foo.cc");
840     MakeRequest("bar.cc");
841 
842     REQUIRE(queue->index_request.Size() == 2);
843     REQUIRE(queue->do_id_map.Size() == 0);
844     while (PumpOnce()) {
845     }
846     REQUIRE(queue->index_request.Size() == 0);
847     REQUIRE(queue->do_id_map.Size() == 105);
848 
849     REQUIRE(file_consumer_shared.used_files.empty());
850   }
851 }
852