1 //===--- Client.cpp ----------------------------------------------*- C++-*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include <grpc++/grpc++.h>
10 
11 #include "Client.h"
12 #include "Feature.h"
13 #include "Service.grpc.pb.h"
14 #include "index/Index.h"
15 #include "marshalling/Marshalling.h"
16 #include "support/Logger.h"
17 #include "support/Trace.h"
18 #include "llvm/ADT/SmallString.h"
19 #include "llvm/ADT/StringRef.h"
20 #include "llvm/Support/Error.h"
21 
22 #include <atomic>
23 #include <chrono>
24 #include <memory>
25 
26 namespace clang {
27 namespace clangd {
28 namespace remote {
29 namespace {
30 
toString(const grpc_connectivity_state & State)31 llvm::StringRef toString(const grpc_connectivity_state &State) {
32   switch (State) {
33   case GRPC_CHANNEL_IDLE:
34     return "idle";
35   case GRPC_CHANNEL_CONNECTING:
36     return "connecting";
37   case GRPC_CHANNEL_READY:
38     return "ready";
39   case GRPC_CHANNEL_TRANSIENT_FAILURE:
40     return "transient failure";
41   case GRPC_CHANNEL_SHUTDOWN:
42     return "shutdown";
43   }
44   llvm_unreachable("Not a valid grpc_connectivity_state.");
45 }
46 
47 class IndexClient : public clangd::SymbolIndex {
updateConnectionStatus() const48   void updateConnectionStatus() const {
49     auto NewStatus = Channel->GetState(/*try_to_connect=*/false);
50     auto OldStatus = ConnectionStatus.exchange(NewStatus);
51     if (OldStatus != NewStatus)
52       vlog("Remote index connection [{0}]: {1} => {2}", ServerAddress,
53            toString(OldStatus), toString(NewStatus));
54   }
55 
56   template <typename RequestT, typename ReplyT>
57   using StreamingCall = std::unique_ptr<grpc::ClientReader<ReplyT>> (
58       remote::v1::SymbolIndex::Stub::*)(grpc::ClientContext *,
59                                         const RequestT &);
60 
61   template <typename RequestT, typename ReplyT, typename ClangdRequestT,
62             typename CallbackT>
streamRPC(ClangdRequestT Request,StreamingCall<RequestT,ReplyT> RPCCall,CallbackT Callback) const63   bool streamRPC(ClangdRequestT Request,
64                  StreamingCall<RequestT, ReplyT> RPCCall,
65                  CallbackT Callback) const {
66     updateConnectionStatus();
67     // We initialize to true because stream might be broken before we see the
68     // final message. In such a case there are actually more results on the
69     // stream, but we couldn't get to them.
70     bool HasMore = true;
71     trace::Span Tracer(RequestT::descriptor()->name());
72     const auto RPCRequest = ProtobufMarshaller->toProtobuf(Request);
73     SPAN_ATTACH(Tracer, "Request", RPCRequest.DebugString());
74     grpc::ClientContext Context;
75     Context.AddMetadata("version", versionString());
76     Context.AddMetadata("features", featureString());
77     Context.AddMetadata("platform", platformString());
78     std::chrono::system_clock::time_point StartTime =
79         std::chrono::system_clock::now();
80     auto Deadline = StartTime + DeadlineWaitingTime;
81     Context.set_deadline(Deadline);
82     auto Reader = (Stub.get()->*RPCCall)(&Context, RPCRequest);
83     dlog("Sending {0}: {1}", RequestT::descriptor()->name(),
84          RPCRequest.DebugString());
85     ReplyT Reply;
86     unsigned Successful = 0;
87     unsigned FailedToParse = 0;
88     while (Reader->Read(&Reply)) {
89       if (!Reply.has_stream_result()) {
90         HasMore = Reply.final_result().has_more();
91         continue;
92       }
93       auto Response = ProtobufMarshaller->fromProtobuf(Reply.stream_result());
94       if (!Response) {
95         elog("Received invalid {0}: {1}. Reason: {2}",
96              ReplyT::descriptor()->name(), Reply.stream_result().DebugString(),
97              Response.takeError());
98         ++FailedToParse;
99         continue;
100       }
101       Callback(*Response);
102       ++Successful;
103     }
104     auto Millis = std::chrono::duration_cast<std::chrono::milliseconds>(
105                       std::chrono::system_clock::now() - StartTime)
106                       .count();
107     vlog("Remote index [{0}]: {1} => {2} results in {3}ms.", ServerAddress,
108          RequestT::descriptor()->name(), Successful, Millis);
109     SPAN_ATTACH(Tracer, "Status", Reader->Finish().ok());
110     SPAN_ATTACH(Tracer, "Successful", Successful);
111     SPAN_ATTACH(Tracer, "Failed to parse", FailedToParse);
112     updateConnectionStatus();
113     return HasMore;
114   }
115 
116 public:
IndexClient(std::shared_ptr<grpc::Channel> Channel,llvm::StringRef Address,llvm::StringRef ProjectRoot,std::chrono::milliseconds DeadlineTime=std::chrono::milliseconds (1000))117   IndexClient(
118       std::shared_ptr<grpc::Channel> Channel, llvm::StringRef Address,
119       llvm::StringRef ProjectRoot,
120       std::chrono::milliseconds DeadlineTime = std::chrono::milliseconds(1000))
121       : Stub(remote::v1::SymbolIndex::NewStub(Channel)), Channel(Channel),
122         ServerAddress(Address),
123         ConnectionStatus(Channel->GetState(/*try_to_connect=*/true)),
124         ProtobufMarshaller(new Marshaller(/*RemoteIndexRoot=*/"",
125                                           /*LocalIndexRoot=*/ProjectRoot)),
126         DeadlineWaitingTime(DeadlineTime) {
127     assert(!ProjectRoot.empty());
128   }
129 
lookup(const clangd::LookupRequest & Request,llvm::function_ref<void (const clangd::Symbol &)> Callback) const130   void lookup(const clangd::LookupRequest &Request,
131               llvm::function_ref<void(const clangd::Symbol &)> Callback)
132       const override {
133     streamRPC(Request, &remote::v1::SymbolIndex::Stub::Lookup, Callback);
134   }
135 
fuzzyFind(const clangd::FuzzyFindRequest & Request,llvm::function_ref<void (const clangd::Symbol &)> Callback) const136   bool fuzzyFind(const clangd::FuzzyFindRequest &Request,
137                  llvm::function_ref<void(const clangd::Symbol &)> Callback)
138       const override {
139     return streamRPC(Request, &remote::v1::SymbolIndex::Stub::FuzzyFind,
140                      Callback);
141   }
142 
143   bool
refs(const clangd::RefsRequest & Request,llvm::function_ref<void (const clangd::Ref &)> Callback) const144   refs(const clangd::RefsRequest &Request,
145        llvm::function_ref<void(const clangd::Ref &)> Callback) const override {
146     return streamRPC(Request, &remote::v1::SymbolIndex::Stub::Refs, Callback);
147   }
148 
149   void
relations(const clangd::RelationsRequest & Request,llvm::function_ref<void (const SymbolID &,const clangd::Symbol &)> Callback) const150   relations(const clangd::RelationsRequest &Request,
151             llvm::function_ref<void(const SymbolID &, const clangd::Symbol &)>
152                 Callback) const override {
153     streamRPC(Request, &remote::v1::SymbolIndex::Stub::Relations,
154               // Unpack protobuf Relation.
155               [&](std::pair<SymbolID, clangd::Symbol> SubjectAndObject) {
156                 Callback(SubjectAndObject.first, SubjectAndObject.second);
157               });
158   }
159 
160   llvm::unique_function<IndexContents(llvm::StringRef) const>
indexedFiles() const161   indexedFiles() const override {
162     // FIXME: For now we always return IndexContents::None regardless of whether
163     //        the file was indexed or not. A possible implementation could be
164     //        based on the idea that we do not want to send a request at every
165     //        call of a function returned by IndexClient::indexedFiles().
166     return [](llvm::StringRef) { return IndexContents::None; };
167   }
168 
169   // IndexClient does not take any space since the data is stored on the
170   // server.
estimateMemoryUsage() const171   size_t estimateMemoryUsage() const override { return 0; }
172 
173 private:
174   std::unique_ptr<remote::v1::SymbolIndex::Stub> Stub;
175   std::shared_ptr<grpc::Channel> Channel;
176   llvm::SmallString<256> ServerAddress;
177   mutable std::atomic<grpc_connectivity_state> ConnectionStatus;
178   std::unique_ptr<Marshaller> ProtobufMarshaller;
179   // Each request will be terminated if it takes too long.
180   std::chrono::milliseconds DeadlineWaitingTime;
181 };
182 
183 } // namespace
184 
getClient(llvm::StringRef Address,llvm::StringRef ProjectRoot)185 std::unique_ptr<clangd::SymbolIndex> getClient(llvm::StringRef Address,
186                                                llvm::StringRef ProjectRoot) {
187   const auto Channel =
188       grpc::CreateChannel(Address.str(), grpc::InsecureChannelCredentials());
189   return std::unique_ptr<clangd::SymbolIndex>(
190       new IndexClient(Channel, Address, ProjectRoot));
191 }
192 
193 } // namespace remote
194 } // namespace clangd
195 } // namespace clang
196